Files
p2p/client-network/src/messages_channels.rs

146 lines
4.6 KiB
Rust

use crate::cryptographic_signature::CryptographicSignature;
use crate::message_handling::EventType;
use crate::message_handling::handle_recevied_message;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::net::UdpSocket;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{self, Sender};
use std::thread;
pub struct MultipleSenders {
senders: Vec<Sender<Message>>,
}
pub struct Message {
payload: Vec<u8>,
address: String,
}
impl Message {
pub fn new(payload: Vec<u8>, address: String) -> Self {
Message {
payload: payload,
address: address,
}
}
}
impl MultipleSenders {
pub fn new(num_channels: usize, socket: &Arc<UdpSocket>) -> Self {
let mut senders = Vec::new();
// Wrap the socket in an Arc so it can be shared across threads
for i in 0..num_channels {
let (tx, rx) = mpsc::channel::<Message>();
// Clone the Arc (this just bumps the reference count, it doesn't copy the socket)
let sock_clone = Arc::clone(&socket);
senders.push(tx);
thread::spawn(move || {
println!("Canal d'envoi {} prêt", i);
for msg in rx {
// Use the cloned Arc inside the thread
if let Err(e) = sock_clone.send_to(&msg.payload, &msg.address) {
eprintln!(
"Erreur d'envoi sur canal {}: {}, address: {}",
i, e, &msg.address
);
} else {
let message_id: [u8; 4] = msg.payload[0..4].try_into().expect("size error");
let id = i32::from_be_bytes(message_id);
let message_type = msg.payload[4];
println!(
"Message {0} de type {1} envoyé à {2} par le canal {3}",
id, message_type, msg.address, i
);
}
}
});
}
MultipleSenders { senders }
}
/// Envoie un message via un canal spécifique (round-robin ou index précis)
pub fn send_via(&self, channel_idx: usize, data: Vec<u8>, remote_addr: String) {
if let Some(sender) = self.senders.get(channel_idx) {
let _ = sender.send(Message {
payload: data,
address: remote_addr,
});
}
}
}
/*pub fn start_receving_thread(
socket: &Arc<UdpSocket>,
messages_list: &Arc<HashMap<i32, EventType>>,
crypto_pair: &Arc<CryptographicSignature>,
socket_addr: SocketAddr,
senders: &Arc<MultipleSenders>,
) {
let sock_clone = Arc::clone(socket);
let cryptopair_clone = Arc::clone(crypto_pair);
let senders_clone = Arc::clone(senders);
let messages_clone = Arc::clone(messages_list);
thread::spawn(move || {
let mut buf = [0u8; 1024];
loop {
match sock_clone.recv_from(&mut buf) {
Ok((amt, src)) => {
handle_recevied_message(
&messages_clone,
&buf.to_vec(),
&cryptopair_clone,
&socket_addr,
&senders_clone,
);
println!("Reçu {} octets de {}: {:?}", amt, src, &buf[..amt]);
}
Err(e) => eprintln!("Erreur de réception: {}", e),
}
}
});
}*/
pub fn start_receving_thread(
socket: &Arc<UdpSocket>,
messages_list: &Arc<Mutex<HashMap<i32, EventType>>>,
crypto_pair: &Arc<CryptographicSignature>,
socket_addr: SocketAddr,
senders: &Arc<MultipleSenders>,
) {
let sock_clone = Arc::clone(socket);
let cryptopair_clone = Arc::clone(crypto_pair);
let senders_clone = Arc::clone(senders);
let messages_clone = Arc::clone(messages_list);
thread::spawn(move || {
let mut buf = [0u8; 1024];
loop {
match sock_clone.recv_from(&mut buf) {
Ok((amt, src)) => {
let received_data = buf[..amt].to_vec();
println!("Reçu {} octets de {}: {:?}", amt, src, received_data);
handle_recevied_message(
&messages_clone,
&received_data,
&cryptopair_clone,
&socket_addr,
&senders_clone,
);
}
Err(e) => eprintln!("Erreur de réception: {}", e),
}
}
});
}