865 lines
36 KiB
Rust
865 lines
36 KiB
Rust
mod cryptographic_signature;
|
|
mod data;
|
|
mod datum_parsing;
|
|
mod fetchsocketaddresserror;
|
|
mod message_handling;
|
|
mod messages_channels;
|
|
mod messages_structure;
|
|
mod peers_refresh;
|
|
mod registration;
|
|
mod server_communication;
|
|
mod threads_handling;
|
|
mod timestamp;
|
|
|
|
use crate::fetchsocketaddresserror::FetchSocketAddressError;
|
|
use crate::messages_structure::ROOTREPLY;
|
|
use crate::peers_refresh::*;
|
|
use crate::timestamp::Timestamp;
|
|
use crate::{
|
|
cryptographic_signature::CryptographicSignature,
|
|
message_handling::EventType,
|
|
messages_channels::{MultipleSenders, start_receving_thread, start_retry_thread},
|
|
messages_structure::{
|
|
DATUM, DATUMREQUEST, NATTRAVERSALREQUEST, NODATUM, PING, ROOTREQUEST, construct_message,
|
|
},
|
|
peers_refresh::HandshakeHistory,
|
|
registration::{parse_addresses, perform_handshake, register_with_the_server},
|
|
server_communication::{generate_id, get_peer_list},
|
|
threads_handling::Worker,
|
|
};
|
|
use std::{
|
|
io::Error,
|
|
net::{IpAddr, UdpSocket},
|
|
time::Duration,
|
|
};
|
|
use std::{
|
|
net::SocketAddr,
|
|
sync::{Arc, Mutex},
|
|
};
|
|
|
|
pub struct P2PSharedData {
|
|
shared_socket: Arc<UdpSocket>,
|
|
shared_cryptopair: Arc<CryptographicSignature>,
|
|
shared_messageslist: Arc<Mutex<HashMap<i32, EventType>>>,
|
|
shared_messagesreceived: Arc<Mutex<HashMap<String, (EventType, Timestamp)>>>,
|
|
shared_senders: Arc<MultipleSenders>,
|
|
server_name: Arc<Mutex<String>>,
|
|
server_address: Arc<Mutex<String>>,
|
|
handshake_peers: Arc<HandshakeHistory>,
|
|
threads: Vec<Worker>,
|
|
}
|
|
|
|
use bytes::Bytes;
|
|
use reqwest::Client;
|
|
use tokio::time::sleep;
|
|
|
|
impl P2PSharedData {
|
|
pub fn new(
|
|
username: String,
|
|
cmd_tx: crossbeam_channel::Sender<NetworkEvent>,
|
|
) -> Result<P2PSharedData, Error> {
|
|
let messages_list = HashMap::<i32, EventType>::new();
|
|
let messagesrecv_list = HashMap::<String, (EventType, Timestamp)>::new();
|
|
let username = String::from(username);
|
|
let crypto_pair = CryptographicSignature::new(username);
|
|
let socket = UdpSocket::bind("0.0.0.0:0")?;
|
|
let shared_socket = Arc::new(socket);
|
|
let shared_cryptopair = Arc::new(crypto_pair);
|
|
let shared_messageslist = Arc::new(Mutex::new(messages_list));
|
|
let shared_messagesreceived = Arc::new(Mutex::new(messagesrecv_list));
|
|
|
|
let mut threads = Vec::new();
|
|
|
|
let senders = MultipleSenders::new(
|
|
5,
|
|
&shared_socket,
|
|
cmd_tx,
|
|
&mut threads,
|
|
shared_messageslist.clone(),
|
|
);
|
|
let shared_senders = Arc::new(senders);
|
|
let server_name = Arc::new(Mutex::new("".to_string()));
|
|
let server_address = Arc::new(Mutex::new("".to_string()));
|
|
let handhsake_peers = Arc::new(HandshakeHistory::new());
|
|
|
|
Ok(P2PSharedData {
|
|
shared_socket: shared_socket,
|
|
shared_cryptopair: shared_cryptopair,
|
|
shared_messageslist: shared_messageslist,
|
|
shared_messagesreceived: shared_messagesreceived,
|
|
shared_senders: shared_senders,
|
|
server_name: server_name,
|
|
server_address: server_address,
|
|
handshake_peers: handhsake_peers,
|
|
threads,
|
|
})
|
|
}
|
|
pub fn socket(&self) -> Arc<UdpSocket> {
|
|
self.shared_socket.clone()
|
|
}
|
|
|
|
pub fn cryptopair(&self) -> Arc<CryptographicSignature> {
|
|
self.shared_cryptopair.clone()
|
|
}
|
|
pub fn messages_list(&self) -> Arc<Mutex<HashMap<i32, EventType>>> {
|
|
self.shared_messageslist.clone()
|
|
}
|
|
pub fn messages_received(&self) -> Arc<Mutex<HashMap<String, (EventType, Timestamp)>>> {
|
|
self.shared_messagesreceived.clone()
|
|
}
|
|
pub fn servername(&self) -> String {
|
|
let guard = {
|
|
let maybe_sn = self.server_name.lock().unwrap();
|
|
maybe_sn.clone()
|
|
};
|
|
guard.to_string()
|
|
}
|
|
pub fn serveraddress(&self) -> String {
|
|
let guard = {
|
|
let maybe_sn = self.server_address.lock().unwrap();
|
|
maybe_sn.clone()
|
|
};
|
|
guard.to_string()
|
|
}
|
|
pub fn set_servername(&self, new: String) {
|
|
let mut guard = self.server_name.lock().unwrap();
|
|
*guard = new
|
|
}
|
|
pub fn set_serveraddress(&self, new: String) {
|
|
let mut guard = self.server_address.lock().unwrap();
|
|
*guard = new
|
|
}
|
|
pub fn senders(&self) -> Arc<MultipleSenders> {
|
|
self.shared_senders.clone()
|
|
}
|
|
pub fn socket_ref(&self) -> &UdpSocket {
|
|
&*self.shared_socket
|
|
}
|
|
pub fn handshakes(&self) -> Arc<HandshakeHistory> {
|
|
self.handshake_peers.clone()
|
|
}
|
|
pub fn cryptopair_ref(&self) -> &CryptographicSignature {
|
|
&*self.shared_cryptopair
|
|
}
|
|
|
|
pub fn messages_list_ref(&self) -> &Mutex<HashMap<i32, EventType>> {
|
|
&*self.shared_messageslist
|
|
}
|
|
|
|
pub fn messages_received_ref(&self) -> &Mutex<HashMap<String, (EventType, Timestamp)>> {
|
|
&*self.shared_messagesreceived
|
|
}
|
|
|
|
pub fn senders_ref(&self) -> &MultipleSenders {
|
|
&*self.shared_senders
|
|
}
|
|
|
|
pub fn add_message(&self, id: i32, evt: EventType) {
|
|
let mut map = self.shared_messageslist.lock().unwrap();
|
|
map.insert(id, evt);
|
|
}
|
|
pub fn threads(&mut self) -> &mut Vec<Worker> {
|
|
&mut self.threads
|
|
}
|
|
|
|
pub fn close_threads(&mut self) {
|
|
for w in self.threads.drain(..) {
|
|
w.stop();
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Messages sent to the Network thread by the GUI.
|
|
pub enum NetworkCommand {
|
|
ConnectToServerPut(String, String), // ServerIP
|
|
ServerHandshake(String, String), // ServerName
|
|
FetchPeerList(String), // ServerIP
|
|
RegisterAsPeer(String),
|
|
Ping(String, String),
|
|
NatTraversal(String, String),
|
|
ConnectPeer((String, bool)), // IP:PORT
|
|
RequestFileTree(String), // peer_id
|
|
RequestDirectoryContent(String, String),
|
|
RequestChunk(String, String),
|
|
Disconnect(),
|
|
ResetServerPeer(),
|
|
Discover(String, String, String),
|
|
GetChildren([u8; 32], String, bool),
|
|
SendDatum(MerkleNode, [u8; 32], String),
|
|
SendNoDatum(Vec<u8>, String),
|
|
SendRootReply(Vec<u8>, String),
|
|
InitDownload([u8; 32], String, String),
|
|
// ...
|
|
}
|
|
|
|
/// Messages sent to the GUI by the Network thread.
|
|
pub enum NetworkEvent {
|
|
Connected(String),
|
|
ConnectedHandshake(),
|
|
Disconnected(),
|
|
Error(String, String),
|
|
Success(String, String),
|
|
PeerListUpdated(Vec<(String, bool)>),
|
|
FileTreeReceived([u8; 32], MerkleNode, String), // peer_id, content
|
|
DataReceived([u8; 32], MerkleNode, String),
|
|
FileTreeRootReceived(String, NodeHash),
|
|
HandshakeFailed(),
|
|
ServerHandshakeFailed(String),
|
|
DatumRequest([u8; 32], String),
|
|
RootRequest(String),
|
|
InitDownload([u8; 32], String, String),
|
|
// ...
|
|
}
|
|
|
|
use std::collections::HashMap;
|
|
|
|
pub use crate::data::*;
|
|
use crossbeam_channel::{Receiver, Sender};
|
|
use sha2::{Digest, Sha256};
|
|
|
|
pub fn calculate_chunk_id(data: &[u8]) -> String {
|
|
let mut hasher = Sha256::new();
|
|
|
|
hasher.update(data);
|
|
|
|
let hash_bytes = hasher.finalize();
|
|
|
|
hex::encode(hash_bytes)
|
|
}
|
|
|
|
pub fn start_p2p_executor(
|
|
cmd_rx: Receiver<NetworkCommand>,
|
|
event_tx: Sender<NetworkEvent>,
|
|
mut shared_data: Option<P2PSharedData>,
|
|
) -> tokio::task::JoinHandle<()> {
|
|
tokio::task::spawn(async move {
|
|
println!("Network executor started.");
|
|
|
|
// Main network loop
|
|
loop {
|
|
if let Ok(cmd) = cmd_rx.try_recv() {
|
|
match cmd {
|
|
NetworkCommand::InitDownload(hash, ip, name) => {
|
|
if let Some(sd) = shared_data.as_ref() {
|
|
if let Some(res) = sd.handshake_peers.get_peer_info_username(ip) {
|
|
let _ = event_tx.send(NetworkEvent::InitDownload(
|
|
hash,
|
|
res.ip.to_string(),
|
|
name.to_string(),
|
|
));
|
|
}
|
|
}
|
|
}
|
|
NetworkCommand::SendRootReply(node_hash, addr) => {
|
|
if let Some(sd) = shared_data.as_mut() {
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&node_hash);
|
|
let new_id = generate_id();
|
|
let message =
|
|
construct_message(ROOTREPLY, payload, new_id, sd.cryptopair_ref());
|
|
|
|
match message {
|
|
None => {}
|
|
Some(resp_msg) => {
|
|
println!("msg_sent:{:?}", resp_msg);
|
|
sd.senders_ref()
|
|
.send_dispatch(resp_msg, addr.clone(), false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
NetworkCommand::SendNoDatum(node_hash, addr) => {
|
|
if let Some(sd) = shared_data.as_mut() {
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&node_hash);
|
|
let new_id = generate_id();
|
|
let message =
|
|
construct_message(NODATUM, payload, new_id, sd.cryptopair_ref());
|
|
|
|
match message {
|
|
None => {}
|
|
Some(resp_msg) => {
|
|
println!("msg_sent:{:?}", resp_msg);
|
|
sd.senders_ref()
|
|
.send_dispatch(resp_msg, addr.clone(), false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
NetworkCommand::SendDatum(merklennode, node_hash, addr) => {
|
|
if let Some(sd) = shared_data.as_mut() {
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&node_hash);
|
|
payload.extend_from_slice(&merklennode.serialize());
|
|
let new_id = generate_id();
|
|
let message =
|
|
construct_message(DATUM, payload, new_id, sd.cryptopair_ref());
|
|
|
|
match message {
|
|
None => {}
|
|
Some(resp_msg) => {
|
|
println!("msg_sent:{:?}", resp_msg);
|
|
sd.senders_ref()
|
|
.send_dispatch(resp_msg, addr.clone(), false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
NetworkCommand::ServerHandshake(username, ip) => {
|
|
println!("server handshake called");
|
|
if let Some(sd) = shared_data.as_mut() {
|
|
start_receving_thread(sd, event_tx.clone(), sd.handshakes());
|
|
start_retry_thread(
|
|
sd.senders(),
|
|
4,
|
|
sd.messages_list(),
|
|
sd.threads().as_mut(),
|
|
);
|
|
update_handshake(
|
|
sd.senders(),
|
|
sd.cryptopair(),
|
|
sd.messages_list(),
|
|
sd.handshake_peers.get_username_peerinfo_map(),
|
|
);
|
|
let server_address = {
|
|
match get_server_address(username.to_owned(), ip.to_owned()).await {
|
|
Some(addr) => addr.to_string(),
|
|
None => {
|
|
match event_tx.send(NetworkEvent::Error(
|
|
"Couldn't fetch server socket address.".to_owned(),
|
|
username.to_owned(),
|
|
)) {
|
|
Ok(_) => {}
|
|
Err(e) => {
|
|
println!("Network Event Error : {}", e.to_string());
|
|
}
|
|
}
|
|
"".to_owned()
|
|
}
|
|
}
|
|
};
|
|
if server_address.to_owned().eq(&"".to_owned()) {
|
|
continue;
|
|
}
|
|
|
|
sd.set_servername(username.to_owned());
|
|
sd.set_serveraddress(server_address.to_string());
|
|
println!("SET SERVERADDRESS");
|
|
|
|
match perform_handshake(
|
|
&sd,
|
|
username.to_owned(),
|
|
ip,
|
|
event_tx.clone(),
|
|
(true, server_address.to_string()),
|
|
)
|
|
.await
|
|
{
|
|
true => {
|
|
match event_tx.send(NetworkEvent::Success(
|
|
"Handshake established ✔️".to_string(),
|
|
username.to_owned(),
|
|
)) {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
println!("Network Event Error : {}", err.to_string());
|
|
}
|
|
};
|
|
}
|
|
false => {}
|
|
};
|
|
} else {
|
|
println!("no shared data");
|
|
}
|
|
}
|
|
NetworkCommand::ConnectPeer((username, _)) => {
|
|
println!("[Network] ConnectPeer() called");
|
|
println!("[Network] Attempting to connect to: {}", username);
|
|
}
|
|
NetworkCommand::RequestFileTree(_) => {
|
|
println!("[Network] RequestFileTree() called");
|
|
}
|
|
NetworkCommand::Discover(username, _, ip) => {
|
|
// envoie un handshake au peer, puis un root request
|
|
if let Some(sd) = shared_data.as_ref() {
|
|
let res = sd
|
|
.handshake_peers
|
|
.get_peer_info_username(username.to_owned());
|
|
match res {
|
|
Some(peerinfo) => {
|
|
let id = generate_id();
|
|
// envoyer un root request
|
|
let rootrequest = construct_message(
|
|
ROOTREQUEST,
|
|
Vec::new(),
|
|
id,
|
|
sd.cryptopair_ref(),
|
|
);
|
|
println!("matching");
|
|
|
|
match rootrequest {
|
|
None => {}
|
|
Some(resp_msg) => {
|
|
sd.add_message(id, EventType::RootRequest);
|
|
println!("msg_sent:{:?}", resp_msg);
|
|
sd.senders_ref().add_message_to_retry_queue(
|
|
resp_msg.clone(),
|
|
peerinfo.ip.to_string(),
|
|
false,
|
|
);
|
|
|
|
sd.senders_ref().send_dispatch(
|
|
resp_msg,
|
|
peerinfo.ip.to_string(),
|
|
false,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
None => {
|
|
// envoyer un handshake
|
|
match perform_handshake(
|
|
&sd,
|
|
username.to_owned(),
|
|
ip,
|
|
event_tx.clone(),
|
|
(false, "".to_string()),
|
|
)
|
|
.await
|
|
{
|
|
true => {
|
|
match event_tx.send(NetworkEvent::Success(
|
|
"Handshake established ✔️".to_string(),
|
|
username.to_owned(),
|
|
)) {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
println!(
|
|
"Network Event Error : {}",
|
|
err.to_string()
|
|
);
|
|
}
|
|
};
|
|
}
|
|
false => {}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
println!("no shared data");
|
|
}
|
|
}
|
|
NetworkCommand::GetChildren(hash, ip, is_file) => {
|
|
if let Some(sd) = shared_data.as_ref() {
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&hash);
|
|
let new_id = generate_id();
|
|
let datumreqest = construct_message(
|
|
DATUMREQUEST,
|
|
payload,
|
|
new_id,
|
|
sd.cryptopair_ref(),
|
|
);
|
|
match datumreqest {
|
|
None => {}
|
|
Some(resp_msg) => {
|
|
if is_file {
|
|
sd.add_message(new_id, EventType::DatumRequestBig);
|
|
} else {
|
|
sd.add_message(new_id, EventType::DatumRequest);
|
|
}
|
|
println!("msg_sent:{:?}", resp_msg);
|
|
sd.senders_ref().add_message_to_retry_queue(
|
|
resp_msg.clone(),
|
|
ip.clone(),
|
|
false,
|
|
);
|
|
|
|
sd.senders_ref().send_dispatch(resp_msg, ip.clone(), false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
NetworkCommand::RequestDirectoryContent(_, _) => {
|
|
println!("[Network] RequestDirectoryContent() called");
|
|
}
|
|
NetworkCommand::RequestChunk(_, _) => {
|
|
println!("[Network] RequestChunk() called");
|
|
}
|
|
NetworkCommand::ConnectToServerPut(ip, name) => {
|
|
println!("[Network] ConnectToServer() called");
|
|
|
|
// Actual server connection
|
|
|
|
shared_data = match P2PSharedData::new(name.clone(), event_tx.clone()) {
|
|
Ok(sd) => Some(sd),
|
|
Err(e) => {
|
|
let mut err_msg = String::from("failed to initialize socket: ");
|
|
err_msg += &e.to_string();
|
|
match event_tx.send(NetworkEvent::Error(err_msg, name.to_owned())) {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
println!("Network Event Error : {}", err.to_string());
|
|
}
|
|
};
|
|
match event_tx.send(NetworkEvent::Disconnected()) {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
println!("Network Event Error : {}", err.to_string());
|
|
}
|
|
};
|
|
None
|
|
}
|
|
};
|
|
|
|
if let Some(sd) = shared_data.as_ref() {
|
|
if let Err(e) = register_with_the_server(&sd.cryptopair(), &ip).await {
|
|
let mut err_msg = String::from("request failed: ");
|
|
err_msg += &e.to_string();
|
|
match event_tx.send(NetworkEvent::Error(err_msg, name.to_owned())) {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
println!("Network Event Error : {}", err.to_string());
|
|
}
|
|
};
|
|
match event_tx.send(NetworkEvent::Disconnected()) {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
println!("Network Event Error : {}", err.to_string());
|
|
}
|
|
};
|
|
} else {
|
|
match event_tx.send(NetworkEvent::Connected(ip)) {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
println!("Network Event Error : {}", err.to_string());
|
|
}
|
|
};
|
|
println!("username created: {}", sd.cryptopair().username);
|
|
}
|
|
}
|
|
}
|
|
NetworkCommand::FetchPeerList(ip) => {
|
|
println!("[Network] FetchPeerList() called");
|
|
if ip == "" {
|
|
match event_tx.send(NetworkEvent::Error(
|
|
"Not registered to any server".to_string(),
|
|
"".to_owned(),
|
|
)) {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
println!("Network Event Error : {}", err.to_string());
|
|
}
|
|
};
|
|
} else {
|
|
println!("cc");
|
|
match get_peer_list(ip).await {
|
|
Ok(body) => match String::from_utf8(body.to_vec()) {
|
|
Ok(peers_list) => {
|
|
let mut peers: Vec<(String, bool)> = Vec::new();
|
|
let mut current = String::new();
|
|
for i in peers_list.chars() {
|
|
if i == '\n' {
|
|
peers.push((current.clone(), false));
|
|
current.clear();
|
|
} else {
|
|
current.push(i);
|
|
}
|
|
}
|
|
match event_tx.send(NetworkEvent::PeerListUpdated(peers)) {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
println!(
|
|
"Network Event Error : {}",
|
|
err.to_string()
|
|
);
|
|
}
|
|
};
|
|
}
|
|
Err(e) => {
|
|
eprintln!("invalid UTF-8 in socket address bytes: {}", e);
|
|
}
|
|
},
|
|
Err(e) => println!("error : {}", e),
|
|
}
|
|
}
|
|
}
|
|
NetworkCommand::RegisterAsPeer(_) => {
|
|
println!("[Network] RegisterAsPeer() called");
|
|
}
|
|
NetworkCommand::Ping(str, ip) => {
|
|
println!("[Network] Ping({}) called", str);
|
|
if let Some(sd) = shared_data.as_ref() {
|
|
let id = generate_id();
|
|
sd.add_message(id, EventType::Ping);
|
|
let peer_address =
|
|
get_socket_address(str.to_owned(), ip, shared_data.as_ref()).await;
|
|
match peer_address {
|
|
Ok(addr) => {
|
|
match event_tx.send(NetworkEvent::Success(
|
|
format!(
|
|
"Successfully sent ping message to {}.",
|
|
addr.to_string(),
|
|
),
|
|
str.to_owned(),
|
|
)) {
|
|
Ok(_) => {}
|
|
Err(e) => {
|
|
eprintln!("NetworkEvent error : {}", e);
|
|
}
|
|
};
|
|
}
|
|
Err(err_msg) => {
|
|
match event_tx
|
|
.send(NetworkEvent::Error(err_msg.to_string(), str))
|
|
{
|
|
Ok(_) => {}
|
|
Err(e) => {
|
|
eprintln!("NetworkEvent error : {}", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
NetworkCommand::Disconnect() => {
|
|
if let Some(sd) = shared_data.as_ref() {
|
|
println!("Disconnecting: {}", &sd.cryptopair().username);
|
|
shared_data = None;
|
|
match event_tx.send(NetworkEvent::Disconnected()) {
|
|
Ok(_) => {}
|
|
Err(e) => {
|
|
eprintln!("NetworkEvent error : {}", e);
|
|
}
|
|
}
|
|
} else {
|
|
println!("no p2p data");
|
|
}
|
|
}
|
|
NetworkCommand::ResetServerPeer() => {
|
|
if let Some(sd) = shared_data.as_ref() {
|
|
sd.set_servername("".to_string());
|
|
} else {
|
|
println!("no p2p data");
|
|
}
|
|
}
|
|
NetworkCommand::NatTraversal(username, ip) => {
|
|
if let Some(sd) = shared_data.as_ref() {
|
|
println!("username:{}, ip:{}", username, ip);
|
|
// user server to send nattraversal request
|
|
let server_addr = sd.serveraddress();
|
|
let peer_addr_query = get_socket_address(
|
|
username.clone(),
|
|
ip.clone(),
|
|
shared_data.as_ref(),
|
|
);
|
|
|
|
match peer_addr_query.await {
|
|
Ok(peer_addr) => {
|
|
let payload = socket_addr_to_vec(peer_addr);
|
|
|
|
print!("{:?}", payload.clone());
|
|
|
|
let id = generate_id();
|
|
let natreq = construct_message(
|
|
NATTRAVERSALREQUEST,
|
|
payload.clone(),
|
|
id.clone(),
|
|
&sd.cryptopair(),
|
|
);
|
|
|
|
sd.add_message(id, EventType::NatTraversal);
|
|
sd.senders_ref().send_dispatch(
|
|
natreq.expect(
|
|
"couldnt construct message nattraversalrequest2",
|
|
),
|
|
server_addr.to_string(),
|
|
false,
|
|
);
|
|
}
|
|
Err(err_msg) => {
|
|
match event_tx
|
|
.send(NetworkEvent::Error(err_msg.to_string(), username))
|
|
{
|
|
Ok(_) => {}
|
|
Err(e) => {
|
|
eprintln!("NetworkEvent error : {}", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
sleep(std::time::Duration::from_millis(50)).await;
|
|
}
|
|
})
|
|
}
|
|
|
|
fn socket_addr_to_vec(addr: SocketAddr) -> Vec<u8> {
|
|
let mut v = match addr.ip() {
|
|
IpAddr::V4(v4) => v4.octets().to_vec(),
|
|
IpAddr::V6(v6) => v6.octets().to_vec(),
|
|
};
|
|
v.extend(&addr.port().to_be_bytes());
|
|
v
|
|
}
|
|
|
|
async fn quick_ping(addr: &SocketAddr, timeout_ms: u64, sd: &P2PSharedData) -> bool {
|
|
let id = generate_id();
|
|
let pingreq = construct_message(PING, Vec::new(), id, &sd.shared_cryptopair);
|
|
|
|
if let Some(ping) = pingreq {
|
|
sd.add_message(id, EventType::Ping);
|
|
sd.senders_ref()
|
|
.send_dispatch(ping, addr.to_string(), false);
|
|
}
|
|
|
|
sleep(Duration::from_millis(timeout_ms)).await;
|
|
|
|
let msg_list = sd.messages_list_ref().lock().expect("yooo");
|
|
let res = !msg_list.contains_key(&id);
|
|
|
|
for (id, evt) in msg_list.iter() {
|
|
println!("id : {}, evt : {}", id, evt.to_string());
|
|
}
|
|
println!("message list doesnt contain key? {}", res);
|
|
|
|
res
|
|
}
|
|
|
|
///
|
|
/// sends a get request to the server to get the socket address of the given peer
|
|
///
|
|
pub async fn get_socket_address(
|
|
username: String,
|
|
ip: String,
|
|
shared_data: Option<&P2PSharedData>,
|
|
) -> Result<SocketAddr, FetchSocketAddressError> {
|
|
let sd = shared_data.expect("No shared data");
|
|
|
|
let client = match Client::builder().timeout(Duration::from_secs(5)).build() {
|
|
Ok(c) => c,
|
|
Err(e) => {
|
|
return Err(FetchSocketAddressError::ClientError(e.to_string()));
|
|
}
|
|
};
|
|
|
|
let uri = format!("{}/peers/{}/addresses", ip, username);
|
|
let res = match client.get(&uri).send().await {
|
|
Ok(r) => r,
|
|
Err(e) => return Err(FetchSocketAddressError::ClientError(e.to_string())),
|
|
};
|
|
|
|
if res.status().is_success() {
|
|
println!("Successfully retrieved the addresses. {}", res.status());
|
|
} else {
|
|
eprintln!(
|
|
"Failed to get the peers addresses from the server. Status: {}",
|
|
res.status()
|
|
);
|
|
}
|
|
|
|
let body = match res.bytes().await {
|
|
Ok(b) => b,
|
|
Err(e) => {
|
|
return Err(FetchSocketAddressError::ClientError(e.to_string()));
|
|
}
|
|
};
|
|
|
|
let s = match String::from_utf8(body.to_vec()) {
|
|
Ok(st) => st,
|
|
Err(e) => {
|
|
return Err(FetchSocketAddressError::ClientError(e.to_string()));
|
|
}
|
|
};
|
|
|
|
let addresses: Vec<SocketAddr> = {
|
|
let temp = parse_addresses(&s);
|
|
temp.iter()
|
|
.filter_map(|a| match a {
|
|
SocketAddr::V4(_) => Some(*a),
|
|
SocketAddr::V6(_) => None,
|
|
})
|
|
.collect()
|
|
};
|
|
|
|
if addresses.is_empty() {
|
|
return Err(FetchSocketAddressError::NoRegisteredAddresses);
|
|
} else if !addresses.iter().any(|a| matches!(a, SocketAddr::V4(_))) {
|
|
return Err(FetchSocketAddressError::NoIPV4Address);
|
|
}
|
|
|
|
for addr in addresses {
|
|
println!("trying address : {}", addr);
|
|
if quick_ping(&addr, 1000, sd).await {
|
|
return Ok(addr);
|
|
}
|
|
|
|
let payload = socket_addr_to_vec(addr);
|
|
let id = generate_id();
|
|
|
|
let natreq = construct_message(NATTRAVERSALREQUEST, payload.clone(), id, &sd.cryptopair());
|
|
|
|
sd.add_message(id, EventType::NatTraversal);
|
|
sd.senders_ref().send_dispatch(
|
|
natreq.expect("couldnt construct message nattraversalrequest2"),
|
|
sd.serveraddress().to_string(),
|
|
false,
|
|
);
|
|
|
|
sleep(Duration::from_millis(1000)).await;
|
|
|
|
let maybe_entry = {
|
|
let guard = sd.messages_received_ref().lock().unwrap();
|
|
guard.clone()
|
|
}; // guard dropped
|
|
|
|
for (id, (evt, time)) in maybe_entry.iter() {
|
|
println!("{} : {} at {}", id, evt.to_string(), time.to_string());
|
|
if id.eq(&addr.to_string()) && Timestamp::now().diff(time) < 10 {
|
|
println!("received message from address, returning said address..");
|
|
return Ok(addr);
|
|
}
|
|
}
|
|
|
|
if quick_ping(&addr, 5000, sd).await {
|
|
return Ok(addr);
|
|
}
|
|
}
|
|
|
|
Err(FetchSocketAddressError::NoResponseFromUser)
|
|
}
|
|
|
|
pub async fn get_server_address(username: String, ip: String) -> Option<SocketAddr> {
|
|
let client = Client::builder()
|
|
.timeout(Duration::from_secs(5))
|
|
.build()
|
|
.expect("cannot create client");
|
|
let uri = format!("{}/peers/{}/addresses", ip, username);
|
|
let res = client.get(uri).send().await.expect("couldnt get response");
|
|
if res.status().is_success() {
|
|
println!("Successfully retreived the addresses. {}", res.status());
|
|
} else {
|
|
eprintln!(
|
|
"Failed to get the peers addresses from the server. Status: {}",
|
|
res.status()
|
|
);
|
|
}
|
|
let body: Bytes = res.bytes().await.expect("couldnt get bytes");
|
|
|
|
match String::from_utf8(body.to_vec()) {
|
|
Ok(s) => {
|
|
let addresses = parse_addresses(&s);
|
|
if let Some(first) = addresses.first() {
|
|
Some(first.clone())
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
Err(_) => None,
|
|
}
|
|
}
|