diff --git a/client-gui/src/gui_app.rs b/client-gui/src/gui_app.rs index b271593..71fbcc7 100644 --- a/client-gui/src/gui_app.rs +++ b/client-gui/src/gui_app.rs @@ -280,6 +280,10 @@ impl eframe::App for P2PClientApp { _ => {} } println!("remaining chunks size: {}", self.remaining_chunks.len()); + match create_dir("./Download/") { + Ok(_) => println!("Directory created successfully!"), + Err(e) => println!("Failed to create directory: {}", e), + } if self.remaining_chunks.is_empty() { let file = OpenOptions::new().append(true).create(true).open( "./Download/".to_string() diff --git a/client-network/src/data.rs b/client-network/src/data.rs index 4479ffa..59e44f8 100644 --- a/client-network/src/data.rs +++ b/client-network/src/data.rs @@ -474,10 +474,8 @@ pub fn big_or_chunk_to_file(tree: &MerkleTree, node: &MerkleNode, file: &mut Fil } MerkleNode::Chunk(chunk) => { if !chunk.data.is_empty() { - // Enlève le premier élément - let mut data = chunk.data.clone(); // Clone pour éviter de modifier l'original - data.remove(0); // Enlève le premier élément - println!("wrote data {:?}", data); + let mut data = chunk.data.clone(); + data.remove(0); let _ = file.write(&data); } else { println!("chunk.data is empty, nothing to write"); diff --git a/client-network/src/datum_parsing.rs b/client-network/src/datum_parsing.rs index 3797527..5efb93f 100644 --- a/client-network/src/datum_parsing.rs +++ b/client-network/src/datum_parsing.rs @@ -13,7 +13,7 @@ pub fn parse_received_datum( let hash_name: [u8; 32] = recevied_datum[..32].try_into().expect("error"); let value = &recevied_datum[32..datum_length]; let value_slice = value.to_vec(); - println!("valueslice: {:?}, {}", value_slice, value_slice.len()); + // println!("valueslice: {:?}, {}", value_slice, value_slice.len()); println!( "((value_slice.len() - 1) / 32) {} ", diff --git a/client-network/src/lib.rs b/client-network/src/lib.rs index 6cf1057..ba3cc06 100644 --- a/client-network/src/lib.rs +++ b/client-network/src/lib.rs @@ -73,7 +73,13 @@ impl P2PSharedData { let mut threads = Vec::new(); - let senders = MultipleSenders::new(5, &shared_socket, cmd_tx, &mut threads); + let senders = MultipleSenders::new( + 5, + &shared_socket, + cmd_tx, + &mut threads, + shared_messageslist.clone(), + ); let shared_senders = Arc::new(senders); let server_name = Arc::new(Mutex::new("".to_string())); let server_address = Arc::new(Mutex::new("".to_string())); diff --git a/client-network/src/messages_channels.rs b/client-network/src/messages_channels.rs index d7da7b6..ed0fd65 100644 --- a/client-network/src/messages_channels.rs +++ b/client-network/src/messages_channels.rs @@ -50,6 +50,7 @@ impl MultipleSenders { socket: &Arc, cmd_tx: crossbeam_channel::Sender, threads: &mut Vec, + messages_list: Arc>>, ) -> Self { let (tx1, rx1) = crossbeam_channel::unbounded(); @@ -57,13 +58,12 @@ impl MultipleSenders { let sock_clone = Arc::clone(&socket); let cmd_tx_clone = cmd_tx.clone(); let rx: Receiver = rx1.clone(); + let msg_list_clone = messages_list.clone(); let thread = thread::spawn(move || { println!("Canal d'envoi {} prêt", i); loop { - // Priorité aux messages en attente prêts à être réessayés - // Si aucun retry prêt, on bloque sur rx avec timeout court, pour pouvoir traiter les timers let msg = rx.recv().unwrap(); match sock_clone.send_to(&msg.payload, &msg.address) { Ok(_) => { @@ -84,6 +84,13 @@ impl MultipleSenders { "Erreur d'envoi initial sur canal {}: {}, address: {}", i, e, &msg.address ); + let mut guard = msg_list_clone.lock().unwrap(); + let message_id: [u8; 4] = + msg.payload[0..4].try_into().expect("size error"); + + let id = i32::from_be_bytes(message_id); + guard.remove_entry(&id); + drop(guard); } } } @@ -103,35 +110,6 @@ impl MultipleSenders { completed_messages: HashSet::new(), } } - /* - /// Envoie un message via un canal spécifique (round-robin ou index précis) - pub fn send_via( - &self, - channel_idx: usize, - data: Vec, - remote_addr: String, - is_resp_to_server_handshake: bool, - messages_list: &Mutex>, - ) { - println!( - "is_resp_to_server_handshake {}", - is_resp_to_server_handshake - ); - let msg_to_send = Message { - payload: data.clone(), - address: remote_addr, - is_resp_to_server_handshake, - }; - if let Some(sender) = self.senders.get(channel_idx) { - let _ = sender.send(msg_to_send); - } - if !is_resp_to_server_handshake { - let mut guard = messages_list.lock().unwrap(); - let message_id: [u8; 4] = data[0..4].try_into().expect("size error"); - let id = i32::from_be_bytes(message_id); - guard.insert(id, EventType::SendRootRequest); - } - }*/ pub fn send_dispatch( &self, @@ -199,8 +177,7 @@ pub fn start_retry_thread( if guard.contains_key(&id) { drop(guard); - // si le message est n'a pas encore a etre traité, on le - // remet en queue de liste + if front.next_try <= SystemTime::now() .duration_since(UNIX_EPOCH) @@ -234,8 +211,8 @@ pub fn start_retry_thread( ); let base: u64 = 2; - // let backoff = base.saturating_pow(attempt as u32); // 2^1 == 2 seconds - let backoff = 1; + let backoff = base.saturating_pow(attempt as u32); // 2^1 == 2 seconds + //let backoff = 1; let newretry = RetryMessage { next_try: SystemTime::now() .duration_since(UNIX_EPOCH) @@ -246,10 +223,10 @@ pub fn start_retry_thread( attempts: attempt, }; - q.push_back(newretry); // remettre en tête pour réessayer plus tôt + q.push_back(newretry); } } else { - q.push_back(front); // remettre en tête pour réessayer plus tôt + q.push_back(front); } } } @@ -279,7 +256,7 @@ pub fn start_receving_thread( Ok((amt, src)) => { let received_data = buf[..amt].to_vec(); - println!("Reçu {} octets de {}: {:?}", amt, src, received_data); + println!("Reçu {} octets de {}", amt, src); handle_recevied_message( &messages_clone, &messages_received_clone,