wip messages handling

This commit is contained in:
2025-12-23 17:05:29 +01:00
parent 1844037488
commit ced0c992e7
5 changed files with 240 additions and 26 deletions

View File

@@ -34,4 +34,6 @@ async fn main() -> eframe::Result<()> {
Ok(Box::new(app))
}),
)
// Starts the protocol
}

View File

@@ -45,7 +45,7 @@ pub fn formatPubKey(crypto_pair: CryptographicSignature) -> String {
///
/// takes a serialized message and adds the signature using the private key
///
pub fn sign_message(crypto_pair: CryptographicSignature, message: Vec<u8>) -> Vec<u8> {
pub fn sign_message(crypto_pair: &CryptographicSignature, message: &Vec<u8>) -> Vec<u8> {
let length_bytes: [u8; 2] = message[5..7]
.try_into()
.expect("slice with incorrect length");
@@ -105,7 +105,7 @@ mod tests {
let crypto_pair = CryptographicSignature::new(username.clone());
let handshake = HandshakeMessage::hello(0, 12, username);
let ser = handshake.serialize();
let signed_message = sign_message(crypto_pair, ser.clone());
let signed_message = sign_message(&crypto_pair, &ser);
println!("unsigned_message: {:?}", ser);
println!("signed_message: {:?}", signed_message);
}

View File

@@ -1,8 +1,22 @@
mod cryptographic_signature;
mod data;
mod message_handling;
mod messages_channels;
mod messages_structure;
mod registration;
use crate::{
cryptographic_signature::CryptographicSignature,
message_handling::EventType,
messages_channels::{MultipleSenders, start_receving_thread},
registration::{register_ip_addresses, register_with_the_server},
};
use std::sync::{Arc, Mutex};
use std::{
net::{SocketAddr, UdpSocket},
str::FromStr,
};
/// Messages sent to the Network thread by the GUI.
pub enum NetworkCommand {
ConnectToServer(String), // ServerIP
@@ -29,6 +43,8 @@ pub enum NetworkEvent {
// ...
}
use std::collections::HashMap;
pub use crate::data::*;
use crossbeam_channel::{Receiver, Sender};
use sha2::{Digest, Sha256};
@@ -84,6 +100,46 @@ pub fn start_p2p_executor(
// Actual server connection
let messages_list = HashMap::<i32, EventType>::new();
let username = String::from("Gamixtreize");
let crypto_pair = CryptographicSignature::new(username);
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind failed");
let shared_socket = Arc::new(socket);
let shared_cryptopair = Arc::new(crypto_pair);
let shared_messageslist = Arc::new(Mutex::new(messages_list));
let senders = MultipleSenders::new(1, &shared_socket);
let shared_senders = Arc::new(senders);
if let Err(e) = register_with_the_server(&shared_cryptopair, &ip).await {
eprintln!("request failed: {}", e);
}
match SocketAddr::from_str(&ip) {
Ok(sockaddr) => {
start_receving_thread(
&shared_socket,
&shared_messageslist,
&shared_cryptopair,
sockaddr,
&shared_senders,
);
register_ip_addresses(
&shared_cryptopair,
ip,
&shared_senders,
&shared_messageslist,
);
}
Err(e) => {
eprintln!("failed to parse socket address: {}", e);
}
}
tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
let res = event_tx.send(NetworkEvent::Connected());

View File

@@ -0,0 +1,142 @@
use crate::cryptographic_signature::CryptographicSignature;
use crate::message_handling::EventType;
use crate::message_handling::handle_recevied_message;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::UdpSocket;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{self, Sender};
use std::thread;
pub struct MultipleSenders {
senders: Vec<Sender<Message>>,
}
pub struct Message {
payload: Vec<u8>,
address: String,
}
impl Message {
pub fn new(payload: Vec<u8>, address: String) -> Self {
Message {
payload: payload,
address: address,
}
}
}
impl MultipleSenders {
pub fn new(num_channels: usize, socket: &Arc<UdpSocket>) -> Self {
let mut senders = Vec::new();
// Wrap the socket in an Arc so it can be shared across threads
for i in 0..num_channels {
let (tx, rx) = mpsc::channel::<Message>();
// Clone the Arc (this just bumps the reference count, it doesn't copy the socket)
let sock_clone = Arc::clone(&socket);
senders.push(tx);
thread::spawn(move || {
println!("Canal d'envoi {} prêt", i);
for msg in rx {
// Use the cloned Arc inside the thread
if let Err(e) = sock_clone.send_to(&msg.payload, &msg.address) {
eprintln!("Erreur d'envoi sur canal {}: {}", i, e);
} else {
let message_id: [u8; 4] = msg.payload[0..4].try_into().expect("size error");
let id = i32::from_be_bytes(message_id);
let message_type = msg.payload[4];
println!(
"Message {0} de type {1} envoyé à {2} par le canal {3}",
id, message_type, msg.address, i
);
}
}
});
}
MultipleSenders { senders }
}
/// Envoie un message via un canal spécifique (round-robin ou index précis)
pub fn send_via(&self, channel_idx: usize, data: Vec<u8>, remote_addr: String) {
if let Some(sender) = self.senders.get(channel_idx) {
let _ = sender.send(Message {
payload: data,
address: remote_addr,
});
}
}
}
/*pub fn start_receving_thread(
socket: &Arc<UdpSocket>,
messages_list: &Arc<HashMap<i32, EventType>>,
crypto_pair: &Arc<CryptographicSignature>,
socket_addr: SocketAddr,
senders: &Arc<MultipleSenders>,
) {
let sock_clone = Arc::clone(socket);
let cryptopair_clone = Arc::clone(crypto_pair);
let senders_clone = Arc::clone(senders);
let messages_clone = Arc::clone(messages_list);
thread::spawn(move || {
let mut buf = [0u8; 1024];
loop {
match sock_clone.recv_from(&mut buf) {
Ok((amt, src)) => {
handle_recevied_message(
&messages_clone,
&buf.to_vec(),
&cryptopair_clone,
&socket_addr,
&senders_clone,
);
println!("Reçu {} octets de {}: {:?}", amt, src, &buf[..amt]);
}
Err(e) => eprintln!("Erreur de réception: {}", e),
}
}
});
}*/
pub fn start_receving_thread(
socket: &Arc<UdpSocket>,
messages_list: &Arc<Mutex<HashMap<i32, EventType>>>,
crypto_pair: &Arc<CryptographicSignature>,
socket_addr: SocketAddr,
senders: &Arc<MultipleSenders>,
) {
let sock_clone = Arc::clone(socket);
let cryptopair_clone = Arc::clone(crypto_pair);
let senders_clone = Arc::clone(senders);
let messages_clone = Arc::clone(messages_list);
thread::spawn(move || {
let mut buf = [0u8; 1024];
loop {
match sock_clone.recv_from(&mut buf) {
Ok((amt, src)) => {
// Only send the part of the buffer that contains data
let received_data = buf[..amt].to_vec();
handle_recevied_message(
&messages_clone,
&received_data,
&cryptopair_clone,
&socket_addr,
&senders_clone,
);
println!("Reçu {} octets de {}: {:?}", amt, src, received_data);
}
Err(e) => eprintln!("Erreur de réception: {}", e),
}
}
});
}

View File

@@ -1,36 +1,37 @@
use bytes::Bytes;
use crate::cryptographic_signature::{CryptographicSignature, formatPubKey, sign_message};
use crate::message_handling::EventType;
use crate::messages_channels::{Message, MultipleSenders};
use crate::messages_structure::{HandshakeMessage, UDPMessage};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::UdpSocket;
use std::sync::{Arc, Mutex};
///
/// sends the cryptographic signature to the server using a PUT request over the HTTP API.
///
async fn register_with_the_server(
crypto_pair: CryptographicSignature,
pub async fn register_with_the_server(
crypto_pair: &Arc<CryptographicSignature>,
server_uri: &String,
) -> Result<(), reqwest::Error> {
let client = reqwest::Client::new();
let uri = format!(
"https://jch.irif.fr:8443/peers/{}/key",
crypto_pair.username
);
let uri = format!("{0}/peers/{1}/key", server_uri, crypto_pair.username);
let encoded_point = crypto_pair.pub_key.to_encoded_point(false);
let pubkey_bytes = encoded_point.as_ref().to_vec();
let pubkey_bytes_minus = pubkey_bytes[1..].to_vec();
let res = client.put(uri).body(pubkey_bytes_minus).send().await?;
if res.status().is_success() {
println!("Successfully registered with the server.");
let str = hex::encode(res.bytes().await?);
println!("Successfully registered with the server : {}", str);
} else {
eprintln!(
"Failed to register with the server. Status: {}",
res.status()
);
let str = hex::encode(res.bytes().await?);
eprintln!("erreur : {}", str);
}
println!("register ip adresses");
register_ip_addresses(crypto_pair);
Ok(())
}
@@ -57,21 +58,33 @@ async fn get_socket_address(username: String) -> Result<Bytes, reqwest::Error> {
///
/// registers the IP addresses by sending a Hello request to the server.
///
fn register_ip_addresses(crypto_pair: CryptographicSignature) {
let socket = UdpSocket::bind("0.0.0.0:0").expect("bind failed");
pub fn register_ip_addresses(
crypto_pair: &CryptographicSignature,
server_uri: String,
senders: &MultipleSenders,
messages_list: &Arc<Mutex<HashMap<i32, EventType>>>,
) {
let username_size = crypto_pair.username.len();
let hello_handshake =
HandshakeMessage::hello(545, username_size as u16 + 4, crypto_pair.username.clone());
//HandshakeMessage::display(&hello_handshake);
let hello_handshake_serialized = hello_handshake.serialize();
let message_signed = sign_message(crypto_pair, hello_handshake_serialized.to_vec());
socket
.send_to(&message_signed, "81.194.30.229:8443")
.expect("send failed");
let mut buf = [0u8; 1024];
let message_signed = sign_message(crypto_pair, &hello_handshake_serialized);
senders.send_via(0, message_signed, server_uri);
let mut list = messages_list.lock().expect("Failed to lock messages_list");
match list.get(&545) {
Some(_) => {
list.remove(&545);
}
None => {
list.insert(545, EventType::ServerHelloReply);
}
}
// 3. Perform the insertion
/*let mut buf = [0u8; 1024];
socket.recv_from(&mut buf).expect("receive failed");
let hello_handshake_received = UDPMessage::parse(buf.to_vec());
hello_handshake_received.display();
hello_handshake_received.display();*/
}
#[cfg(test)]
@@ -79,19 +92,20 @@ mod tests {
// Note this useful idiom: importing names from outer (for mod tests) scope.
use super::*;
///
/*///
/// does the procedure to register with the server
///
#[tokio::test]
async fn registering_with_server() {
let username = String::from("gamixtreize");
let username = String::from("gameixtreize");
let server_uri = String::from("https://jch.irif.fr:8443");
let crypto_pair = CryptographicSignature::new(username);
if let Err(e) = register_with_the_server(crypto_pair).await {
if let Err(e) = register_with_the_server(crypto_pair, server_uri).await {
eprintln!("Error during registration: {}", e);
}
}
}*/
///
/*///
/// retreives the socket address of a given peer
///
#[tokio::test]
@@ -105,5 +119,5 @@ mod tests {
eprintln!("Erreur HTTP: {}", e);
}
}
}
}*/
}