message handling and serv registration
This commit is contained in:
@@ -25,6 +25,7 @@ pub struct P2PClientApp {
|
|||||||
status_message: String,
|
status_message: String,
|
||||||
known_peers: Vec<String>,
|
known_peers: Vec<String>,
|
||||||
connect_address_input: String,
|
connect_address_input: String,
|
||||||
|
connect_server_name_input: String,
|
||||||
|
|
||||||
// Key: Parent Directory Hash (String), Value: List of children FileNode
|
// Key: Parent Directory Hash (String), Value: List of children FileNode
|
||||||
loaded_fs: HashMap<String, MerkleTree>,
|
loaded_fs: HashMap<String, MerkleTree>,
|
||||||
@@ -48,7 +49,8 @@ impl P2PClientApp {
|
|||||||
network_event_rx: event_rx,
|
network_event_rx: event_rx,
|
||||||
status_message: "Client Initialized. Awaiting network status...".to_string(),
|
status_message: "Client Initialized. Awaiting network status...".to_string(),
|
||||||
known_peers: vec!["bob".to_string()],
|
known_peers: vec!["bob".to_string()],
|
||||||
connect_address_input: "127.0.0.1:8080".to_string(),
|
connect_address_input: "https://jch.irif.fr:8443".to_string(),
|
||||||
|
connect_server_name_input: "jch.irif.fr".to_string(),
|
||||||
loaded_fs,
|
loaded_fs,
|
||||||
active_peer: None,
|
active_peer: None,
|
||||||
server_status: ServerStatus::Loading,
|
server_status: ServerStatus::Loading,
|
||||||
@@ -125,11 +127,13 @@ impl eframe::App for P2PClientApp {
|
|||||||
ui.horizontal(|ui| {
|
ui.horizontal(|ui| {
|
||||||
ui.label("Server IP:");
|
ui.label("Server IP:");
|
||||||
ui.text_edit_singleline(&mut self.connect_address_input);
|
ui.text_edit_singleline(&mut self.connect_address_input);
|
||||||
|
ui.text_edit_singleline(&mut self.connect_server_name_input);
|
||||||
if ui.button("Connect").clicked() {
|
if ui.button("Connect").clicked() {
|
||||||
let addr = self.connect_address_input.clone();
|
let addr = self.connect_address_input.clone();
|
||||||
|
let serv_name = self.connect_server_name_input.clone();
|
||||||
let _ = self
|
let _ = self
|
||||||
.network_cmd_tx
|
.network_cmd_tx
|
||||||
.send(NetworkCommand::ConnectToServer(addr));
|
.send(NetworkCommand::ConnectToServer(addr, serv_name));
|
||||||
self.server_status = ServerStatus::Loading;
|
self.server_status = ServerStatus::Loading;
|
||||||
ui.close();
|
ui.close();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,7 +9,9 @@ use crate::{
|
|||||||
cryptographic_signature::CryptographicSignature,
|
cryptographic_signature::CryptographicSignature,
|
||||||
message_handling::EventType,
|
message_handling::EventType,
|
||||||
messages_channels::{MultipleSenders, start_receving_thread},
|
messages_channels::{MultipleSenders, start_receving_thread},
|
||||||
registration::{register_ip_addresses, register_with_the_server},
|
registration::{
|
||||||
|
get_socket_address, parse_addresses, register_ip_addresses, register_with_the_server,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::{
|
use std::{
|
||||||
@@ -19,8 +21,8 @@ use std::{
|
|||||||
|
|
||||||
/// Messages sent to the Network thread by the GUI.
|
/// Messages sent to the Network thread by the GUI.
|
||||||
pub enum NetworkCommand {
|
pub enum NetworkCommand {
|
||||||
ConnectToServer(String), // ServerIP
|
ConnectToServer(String, String), // ServerIP
|
||||||
FetchPeerList(String), // ServerIP
|
FetchPeerList(String), // ServerIP
|
||||||
RegisterAsPeer(String),
|
RegisterAsPeer(String),
|
||||||
Ping(),
|
Ping(),
|
||||||
ConnectPeer(String), // IP:PORT
|
ConnectPeer(String), // IP:PORT
|
||||||
@@ -95,7 +97,7 @@ pub fn start_p2p_executor(
|
|||||||
NetworkCommand::RequestChunk(_, _) => {
|
NetworkCommand::RequestChunk(_, _) => {
|
||||||
println!("[Network] RequestChunk() called");
|
println!("[Network] RequestChunk() called");
|
||||||
}
|
}
|
||||||
NetworkCommand::ConnectToServer(ip) => {
|
NetworkCommand::ConnectToServer(ip, server_name) => {
|
||||||
println!("[Network] ConnectToServer() called");
|
println!("[Network] ConnectToServer() called");
|
||||||
|
|
||||||
// Actual server connection
|
// Actual server connection
|
||||||
@@ -119,25 +121,41 @@ pub fn start_p2p_executor(
|
|||||||
eprintln!("request failed: {}", e);
|
eprintln!("request failed: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
match SocketAddr::from_str(&ip) {
|
println!("ip: {}", ip);
|
||||||
Ok(sockaddr) => {
|
|
||||||
start_receving_thread(
|
let server_addr_query = get_socket_address(server_name);
|
||||||
&shared_socket,
|
|
||||||
&shared_messageslist,
|
match server_addr_query.await {
|
||||||
&shared_cryptopair,
|
Ok(sockaddr_bytes) => {
|
||||||
sockaddr,
|
match String::from_utf8(sockaddr_bytes.to_vec()) {
|
||||||
&shared_senders,
|
Ok(s) => {
|
||||||
);
|
let addresses = parse_addresses(&s);
|
||||||
register_ip_addresses(
|
if let Some(first) = addresses.first() {
|
||||||
&shared_cryptopair,
|
// first: &SocketAddr
|
||||||
ip,
|
start_receving_thread(
|
||||||
&shared_senders,
|
&shared_socket,
|
||||||
&shared_messageslist,
|
&shared_messageslist,
|
||||||
);
|
&shared_cryptopair,
|
||||||
}
|
*first, // copie le SocketAddr (implémente Copy pour SocketAddr)
|
||||||
Err(e) => {
|
&shared_senders,
|
||||||
eprintln!("failed to parse socket address: {}", e);
|
);
|
||||||
|
register_ip_addresses(
|
||||||
|
&shared_cryptopair,
|
||||||
|
first.to_string(),
|
||||||
|
&shared_senders,
|
||||||
|
&shared_messageslist,
|
||||||
|
545,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
eprintln!("no valid socket addresses found in: {}", s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("invalid UTF-8 in socket address bytes: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Err(e) => eprintln!("failed to retrieve socket address: {}", e),
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
|
tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
|
||||||
|
|||||||
95
client-network/src/message_handling.rs
Normal file
95
client-network/src/message_handling.rs
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
use crate::{
|
||||||
|
cryptographic_signature::{CryptographicSignature, sign_message},
|
||||||
|
messages_channels::MultipleSenders,
|
||||||
|
messages_structure::HandshakeMessage,
|
||||||
|
registration,
|
||||||
|
};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::{collections::HashMap, net::SocketAddr};
|
||||||
|
|
||||||
|
pub enum EventType {
|
||||||
|
ServerHelloReply,
|
||||||
|
PeerHelloReply,
|
||||||
|
PeerHello,
|
||||||
|
}
|
||||||
|
|
||||||
|
/*pub fn handle_recevied_message(
|
||||||
|
messages_list: &mut HashMap<i32, EventType>,
|
||||||
|
recevied_message: &Vec<u8>,
|
||||||
|
crypto_pair: &CryptographicSignature,
|
||||||
|
socket_addr: &SocketAddr,
|
||||||
|
senders: &MultipleSenders,
|
||||||
|
) {
|
||||||
|
let message_id: [u8; 4] = recevied_message[0..4].try_into().expect("size error");
|
||||||
|
let id = i32::from_be_bytes(message_id);
|
||||||
|
let eventtype = messages_list.get(&id);
|
||||||
|
match eventtype {
|
||||||
|
Some(EventType::ServerHelloReply) => {
|
||||||
|
registration::register_ip_addresses(
|
||||||
|
&crypto_pair,
|
||||||
|
socket_addr.ip().to_string(),
|
||||||
|
&senders,
|
||||||
|
messages_list,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Some(_) => print!("Not implemented"),
|
||||||
|
None => print!("Message not found"),
|
||||||
|
}
|
||||||
|
}*/
|
||||||
|
|
||||||
|
pub fn handle_recevied_message(
|
||||||
|
messages_list: &Arc<Mutex<HashMap<i32, EventType>>>,
|
||||||
|
recevied_message: &Vec<u8>,
|
||||||
|
crypto_pair: &CryptographicSignature,
|
||||||
|
socket_addr: &SocketAddr,
|
||||||
|
senders: &MultipleSenders,
|
||||||
|
) {
|
||||||
|
if recevied_message.len() < 4 {
|
||||||
|
return;
|
||||||
|
} // Basic safety check
|
||||||
|
|
||||||
|
let message_id: [u8; 4] = recevied_message[0..4].try_into().expect("size error");
|
||||||
|
let id = i32::from_be_bytes(message_id);
|
||||||
|
|
||||||
|
// Lock the mutex to access the HashMap
|
||||||
|
let list = messages_list.lock().unwrap();
|
||||||
|
|
||||||
|
let eventtype = list.get(&id); // Clone the enum so we can release the lock if needed
|
||||||
|
match eventtype {
|
||||||
|
Some(EventType::ServerHelloReply) => {
|
||||||
|
/*registration::register_ip_addresses(
|
||||||
|
crypto_pair,
|
||||||
|
socket_addr.to_string(),
|
||||||
|
senders,
|
||||||
|
&messages_list, // Pass the mutable reference inside the lock
|
||||||
|
546,
|
||||||
|
);*/
|
||||||
|
}
|
||||||
|
Some(_) => print!("Not implemented"),
|
||||||
|
None => {
|
||||||
|
let message_type = recevied_message[4];
|
||||||
|
if message_type == 1 {
|
||||||
|
let username_size = crypto_pair.username.len();
|
||||||
|
let hello_handshake = HandshakeMessage::helloReply(
|
||||||
|
id as u32,
|
||||||
|
username_size as u16 + 4,
|
||||||
|
crypto_pair.username.clone(),
|
||||||
|
);
|
||||||
|
//HandshakeMessage::display(&hello_handshake);
|
||||||
|
let hello_handshake_serialized = hello_handshake.serialize();
|
||||||
|
let message_signed = sign_message(crypto_pair, &hello_handshake_serialized);
|
||||||
|
senders.send_via(0, message_signed, socket_addr.to_string());
|
||||||
|
let mut list = messages_list.lock().expect("Failed to lock messages_list");
|
||||||
|
match list.get(&id) {
|
||||||
|
Some(_) => {
|
||||||
|
list.remove(&id);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
list.insert(id, EventType::ServerHelloReply);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
print!("Message not found for ID: {}", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -47,7 +47,10 @@ impl MultipleSenders {
|
|||||||
for msg in rx {
|
for msg in rx {
|
||||||
// Use the cloned Arc inside the thread
|
// Use the cloned Arc inside the thread
|
||||||
if let Err(e) = sock_clone.send_to(&msg.payload, &msg.address) {
|
if let Err(e) = sock_clone.send_to(&msg.payload, &msg.address) {
|
||||||
eprintln!("Erreur d'envoi sur canal {}: {}", i, e);
|
eprintln!(
|
||||||
|
"Erreur d'envoi sur canal {}: {}, address: {}",
|
||||||
|
i, e, &msg.address
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
let message_id: [u8; 4] = msg.payload[0..4].try_into().expect("size error");
|
let message_id: [u8; 4] = msg.payload[0..4].try_into().expect("size error");
|
||||||
let id = i32::from_be_bytes(message_id);
|
let id = i32::from_be_bytes(message_id);
|
||||||
@@ -85,6 +88,7 @@ impl MultipleSenders {
|
|||||||
let sock_clone = Arc::clone(socket);
|
let sock_clone = Arc::clone(socket);
|
||||||
let cryptopair_clone = Arc::clone(crypto_pair);
|
let cryptopair_clone = Arc::clone(crypto_pair);
|
||||||
let senders_clone = Arc::clone(senders);
|
let senders_clone = Arc::clone(senders);
|
||||||
|
|
||||||
let messages_clone = Arc::clone(messages_list);
|
let messages_clone = Arc::clone(messages_list);
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let mut buf = [0u8; 1024];
|
let mut buf = [0u8; 1024];
|
||||||
@@ -123,9 +127,9 @@ pub fn start_receving_thread(
|
|||||||
loop {
|
loop {
|
||||||
match sock_clone.recv_from(&mut buf) {
|
match sock_clone.recv_from(&mut buf) {
|
||||||
Ok((amt, src)) => {
|
Ok((amt, src)) => {
|
||||||
// Only send the part of the buffer that contains data
|
|
||||||
let received_data = buf[..amt].to_vec();
|
let received_data = buf[..amt].to_vec();
|
||||||
|
|
||||||
|
println!("Reçu {} octets de {}: {:?}", amt, src, received_data);
|
||||||
handle_recevied_message(
|
handle_recevied_message(
|
||||||
&messages_clone,
|
&messages_clone,
|
||||||
&received_data,
|
&received_data,
|
||||||
@@ -133,7 +137,6 @@ pub fn start_receving_thread(
|
|||||||
&socket_addr,
|
&socket_addr,
|
||||||
&senders_clone,
|
&senders_clone,
|
||||||
);
|
);
|
||||||
println!("Reçu {} octets de {}: {:?}", amt, src, received_data);
|
|
||||||
}
|
}
|
||||||
Err(e) => eprintln!("Erreur de réception: {}", e),
|
Err(e) => eprintln!("Erreur de réception: {}", e),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ use crate::messages_structure::{HandshakeMessage, UDPMessage};
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
|
use std::str::FromStr;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
///
|
///
|
||||||
@@ -39,7 +40,7 @@ pub async fn register_with_the_server(
|
|||||||
/// sends a get request to the server to get the socket address of the given peer
|
/// sends a get request to the server to get the socket address of the given peer
|
||||||
///
|
///
|
||||||
|
|
||||||
async fn get_socket_address(username: String) -> Result<Bytes, reqwest::Error> {
|
pub async fn get_socket_address(username: String) -> Result<Bytes, reqwest::Error> {
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::new();
|
||||||
let uri = format!("https://jch.irif.fr:8443/peers/{}/addresses", username);
|
let uri = format!("https://jch.irif.fr:8443/peers/{}/addresses", username);
|
||||||
let res = client.get(uri).send().await?;
|
let res = client.get(uri).send().await?;
|
||||||
@@ -55,6 +56,20 @@ async fn get_socket_address(username: String) -> Result<Bytes, reqwest::Error> {
|
|||||||
Ok(body)
|
Ok(body)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn parse_addresses(input: &String) -> Vec<SocketAddr> {
|
||||||
|
let mut addrs = Vec::new();
|
||||||
|
for line in input.lines() {
|
||||||
|
let s = line.trim();
|
||||||
|
if s.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Ok(sock) = SocketAddr::from_str(s) {
|
||||||
|
addrs.push(sock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addrs
|
||||||
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
/// registers the IP addresses by sending a Hello request to the server.
|
/// registers the IP addresses by sending a Hello request to the server.
|
||||||
///
|
///
|
||||||
@@ -63,21 +78,25 @@ pub fn register_ip_addresses(
|
|||||||
server_uri: String,
|
server_uri: String,
|
||||||
senders: &MultipleSenders,
|
senders: &MultipleSenders,
|
||||||
messages_list: &Arc<Mutex<HashMap<i32, EventType>>>,
|
messages_list: &Arc<Mutex<HashMap<i32, EventType>>>,
|
||||||
|
id: i32,
|
||||||
) {
|
) {
|
||||||
let username_size = crypto_pair.username.len();
|
let username_size = crypto_pair.username.len();
|
||||||
let hello_handshake =
|
let hello_handshake = HandshakeMessage::hello(
|
||||||
HandshakeMessage::hello(545, username_size as u16 + 4, crypto_pair.username.clone());
|
id as u32,
|
||||||
|
username_size as u16 + 4,
|
||||||
|
crypto_pair.username.clone(),
|
||||||
|
);
|
||||||
//HandshakeMessage::display(&hello_handshake);
|
//HandshakeMessage::display(&hello_handshake);
|
||||||
let hello_handshake_serialized = hello_handshake.serialize();
|
let hello_handshake_serialized = hello_handshake.serialize();
|
||||||
let message_signed = sign_message(crypto_pair, &hello_handshake_serialized);
|
let message_signed = sign_message(crypto_pair, &hello_handshake_serialized);
|
||||||
senders.send_via(0, message_signed, server_uri);
|
senders.send_via(0, message_signed, server_uri);
|
||||||
let mut list = messages_list.lock().expect("Failed to lock messages_list");
|
let mut list = messages_list.lock().expect("Failed to lock messages_list");
|
||||||
match list.get(&545) {
|
match list.get(&id) {
|
||||||
Some(_) => {
|
Some(_) => {
|
||||||
list.remove(&545);
|
list.remove(&id);
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
list.insert(545, EventType::ServerHelloReply);
|
list.insert(id, EventType::ServerHelloReply);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 3. Perform the insertion
|
// 3. Perform the insertion
|
||||||
|
|||||||
12
todo.md
12
todo.md
@@ -1,16 +1,16 @@
|
|||||||
# Todo :
|
# Todo :
|
||||||
## peer discovery
|
## peer discovery
|
||||||
- get rsquest to the uri /peers/
|
- get rsquest to the uri /peers/
|
||||||
|
|
||||||
## registration with the server
|
## registration with the server
|
||||||
- generation of the cryptographic key OK
|
- generation of the cryptographic key OK
|
||||||
- put request to the uri (check if the peer is already connected) OK
|
- put request to the uri (check if the peer is already connected) OK
|
||||||
- udp handshakes
|
- udp handshakes OK
|
||||||
- get request to the uri /peers/key to get the public key of a peer
|
- get request to the uri /peers/key to get the public key of a peer OK
|
||||||
- get request to the uri /peers/key/addresses
|
- get request to the uri /peers/key/addresses OK
|
||||||
|
|
||||||
## handshake
|
## handshake
|
||||||
- handshake structure
|
- handshake structure OK
|
||||||
- 5min timeout after handshake
|
- 5min timeout after handshake
|
||||||
- matain connection every 4 min
|
- matain connection every 4 min
|
||||||
|
|
||||||
@@ -32,4 +32,4 @@ telechargement des fichiers
|
|||||||
choisir un dossier à partager
|
choisir un dossier à partager
|
||||||
se deconnecter du réseau
|
se deconnecter du réseau
|
||||||
|
|
||||||
2 channels -> un pour envoyer et un pour recevoir
|
2 channels -> un pour envoyer et un pour recevoir OK
|
||||||
|
|||||||
Reference in New Issue
Block a user