From 3126811f09ed198a98589d3487afd5473bb69e42 Mon Sep 17 00:00:00 2001 From: Lowa Date: Sun, 16 Feb 2025 03:28:53 +0300 Subject: [PATCH] Improve TFTP server shutdown and error handling - Enhance server stop mechanism with more robust socket and thread management - Add better handling of active transfers during server shutdown - Implement additional safety checks and timeout handling - Improve logging and error reporting for server stop process - Prevent potential deadlocks and resource leaks during server termination --- ComConfigCopy.py | 19 +++- TFTPServer.py | 255 ++++++++++++++++++++++++++++++++++------------- 2 files changed, 201 insertions(+), 73 deletions(-) diff --git a/ComConfigCopy.py b/ComConfigCopy.py index ab5d9cc..d691199 100644 --- a/ComConfigCopy.py +++ b/ComConfigCopy.py @@ -964,9 +964,22 @@ class SerialAppGUI(tk.Tk): """Остановка TFTP сервера.""" if self.tftp_server: try: + # Отключаем кнопки на время остановки сервера + self.start_tftp_button.config(state="disabled") + self.stop_tftp_button.config(state="disabled") + + # Останавливаем сервер self.tftp_server.stop_server() + + # Ждем завершения потока сервера с таймаутом if self.tftp_server_thread: - self.tftp_server_thread.join(timeout=2.0) + self.tftp_server_thread.join(timeout=5.0) + if self.tftp_server_thread.is_alive(): + self.append_tftp_log("[WARN] Превышено время ожидания остановки сервера") + + # Очищаем ссылки на сервер и поток + self.tftp_server = None + self.tftp_server_thread = None # Обновляем состояние кнопок self.start_tftp_button.config(state="normal") @@ -983,6 +996,10 @@ class SerialAppGUI(tk.Tk): except Exception as e: self.append_tftp_log(f"[ERROR] Ошибка остановки сервера: {str(e)}") messagebox.showerror("Ошибка", f"Не удалось остановить TFTP сервер: {str(e)}") + + # Восстанавливаем состояние кнопок в случае ошибки + self.start_tftp_button.config(state="disabled") + self.stop_tftp_button.config(state="normal") def append_tftp_log(self, message): """Добавление сообщения в лог TFTP сервера.""" diff --git a/TFTPServer.py b/TFTPServer.py index 48dace0..8a038e4 100644 --- a/TFTPServer.py +++ b/TFTPServer.py @@ -30,6 +30,7 @@ class TFTPServer: self.running = False self.server_socket = None self.lock = threading.Lock() + self.transfer_sockets = set() # Множество для хранения всех активных сокетов передачи # Словарь активных передач для мониторинга их статуса. # Ключ – адрес клиента, значение – словарь с информацией о передаче. self.active_transfers = {} @@ -60,33 +61,106 @@ class TFTPServer: :param ip: IP-адрес для привязки сервера. :param port: Порт для TFTP сервера. """ + if self.running: + self.log("[WARN] Сервер уже запущен") + return + self.running = True - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.server_socket.bind((ip, port)) - self.log(f"[INFO] TFTP сервер запущен на {ip}:{port}") try: + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.server_socket.bind((ip, port)) + self.log(f"[INFO] TFTP сервер запущен на {ip}:{port}") + while self.running: try: - # Устанавливаем таймаут для возможности периодической проверки состояния сервера. self.server_socket.settimeout(1.0) data, client_addr = self.server_socket.recvfrom(2048) - if data: - # Каждая обработка запроса выполняется в отдельном потоке. + if data and self.running: threading.Thread(target=self.handle_request, args=(data, client_addr), daemon=True).start() except socket.timeout: continue + except socket.error as e: + if self.running: # Логируем ошибку только если сервер еще запущен + self.log(f"[ERROR] Ошибка получения данных: {str(e)}") + break except Exception as e: - self.log(f"[ERROR] Ошибка получения данных: {str(e)}") + if self.running: # Логируем ошибку только если сервер еще запущен + self.log(f"[ERROR] Ошибка получения данных: {str(e)}") + break + except Exception as e: + self.log(f"[ERROR] Ошибка запуска сервера: {str(e)}") finally: - self.server_socket.close() + self.running = False + if self.server_socket: + try: + self.server_socket.close() + except: + pass + self.server_socket = None self.log("[INFO] TFTP сервер остановлен.") def stop_server(self): """ Остановка TFTP сервера. """ - self.running = False + if not self.running: + return + self.log("[INFO] Остановка TFTP сервера...") + self.running = False + + try: + # Закрываем основной сокет сервера первым + if self.server_socket: + try: + # Создаем временный сокет и отправляем пакет самому себе, + # чтобы разблокировать recvfrom + temp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + server_address = self.server_socket.getsockname() + temp_socket.sendto(b'', server_address) + except: + pass + finally: + try: + temp_socket.close() + except: + pass + + try: + self.server_socket.close() + except Exception as e: + self.log(f"[WARN] Ошибка при закрытии основного сокета: {str(e)}") + finally: + self.server_socket = None + + # Закрываем все активные сокеты передачи + with self.lock: + active_sockets = list(self.transfer_sockets) + self.transfer_sockets.clear() + active_transfers = dict(self.active_transfers) + self.active_transfers.clear() + + # Закрываем сокеты передачи после очистки множества + for sock in active_sockets: + try: + if sock: + sock.close() + except Exception as e: + self.log(f"[WARN] Ошибка при закрытии сокета передачи: {str(e)}") + + # Отправляем сообщения об остановке для активных передач + for client_addr, transfer_info in active_transfers.items(): + try: + self.send_error(client_addr, 0, "Сервер остановлен") + except: + pass + + except Exception as e: + self.log(f"[ERROR] Ошибка при остановке сервера: {str(e)}") + finally: + self.running = False # Гарантируем, что флаг running будет False + self.log("[INFO] TFTP сервер остановлен.") def handle_request(self, data, client_addr): """ @@ -193,85 +267,122 @@ class TFTPServer: transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) transfer_socket.settimeout(5.0) # Таймаут ожидания ACK + # Добавляем сокет в множество активных сокетов + with self.lock: + self.transfer_sockets.add(transfer_socket) + block_num = 1 bytes_sent = 0 last_progress_time = time.time() - with open(file_path, "rb") as f: - while True: - try: - data_block = f.read(BLOCK_SIZE) - if not data_block: # Достигнут конец файла - break + try: + with open(file_path, "rb") as f: + while self.running: # Проверяем флаг running + try: + data_block = f.read(BLOCK_SIZE) + if not data_block: # Достигнут конец файла + break - # Формируем TFTP пакет данных - packet = struct.pack("!HH", 3, block_num) + data_block - attempts = 0 - ack_received = False + # Проверяем флаг running перед отправкой блока + if not self.running: + raise Exception("Передача прервана: сервер остановлен") - # Попытка отправки текущего блока (до 3 повторных попыток) - while attempts < 3 and not ack_received: - try: - transfer_socket.sendto(packet, client_addr) - - # Логируем прогресс каждую секунду - current_time = time.time() - if current_time - last_progress_time >= 1.0: - elapsed_time = current_time - start_time - remaining = filesize - bytes_sent - self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | " - f"Отправлено: {bytes_sent}/{filesize} байт | " - f"Осталось: {remaining} байт | " - f"Время: {elapsed_time:.2f} сек.") - last_progress_time = current_time + # Формируем TFTP пакет данных + packet = struct.pack("!HH", 3, block_num) + data_block + attempts = 0 + ack_received = False - # Ожидаем подтверждение - ack_data, addr = transfer_socket.recvfrom(4) # Ожидаем только 4 байта для ACK - if addr == client_addr: - ack_opcode, ack_block = struct.unpack("!HH", ack_data) - if ack_opcode == 4 and ack_block == block_num: - ack_received = True - bytes_sent += len(data_block) - with self.lock: - if client_addr in self.active_transfers: - self.active_transfers[client_addr]['bytes_sent'] = bytes_sent - else: - self.log(f"[WARN] Неверный ACK от {client_addr}. " - f"Ожидался блок {block_num}, получен {ack_block}.") - except socket.timeout: - attempts += 1 - self.log(f"[WARN] Таймаут ожидания ACK для блока {block_num} " - f"от {client_addr}. Попытка {attempts+1}.") - except Exception as e: - self.log(f"[ERROR] Ошибка при отправке блока {block_num}: {str(e)}") - attempts += 1 + # Попытка отправки текущего блока (до 3 повторных попыток) + while attempts < 3 and not ack_received and self.running: + if transfer_socket is None: + raise Exception("Сокет передачи закрыт") + + try: + transfer_socket.sendto(packet, client_addr) + + # Логируем прогресс каждую секунду + current_time = time.time() + if current_time - last_progress_time >= 1.0: + elapsed_time = current_time - start_time + remaining = filesize - bytes_sent + self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | " + f"Отправлено: {bytes_sent}/{filesize} байт | " + f"Осталось: {remaining} байт | " + f"Время: {elapsed_time:.2f} сек.") + last_progress_time = current_time - if not ack_received: - raise Exception(f"Не удалось получить подтверждение для блока {block_num}") + # Ожидаем подтверждение + ack_data, addr = transfer_socket.recvfrom(4) + if addr == client_addr: + ack_opcode, ack_block = struct.unpack("!HH", ack_data) + if ack_opcode == 4 and ack_block == block_num: + ack_received = True + bytes_sent += len(data_block) + with self.lock: + if client_addr in self.active_transfers: + self.active_transfers[client_addr]['bytes_sent'] = bytes_sent + else: + self.log(f"[WARN] Неверный ACK от {client_addr}. " + f"Ожидался блок {block_num}, получен {ack_block}.") + except socket.timeout: + attempts += 1 + self.log(f"[WARN] Таймаут ожидания ACK для блока {block_num} " + f"от {client_addr}. Попытка {attempts+1}.") + except socket.error as e: + if not self.running: + raise Exception("Передача прервана: сервер остановлен") + self.log(f"[ERROR] Ошибка сокета при отправке блока {block_num}: {str(e)}") + attempts += 1 + except Exception as e: + if not self.running: + raise Exception("Передача прервана: сервер остановлен") + self.log(f"[ERROR] Ошибка при отправке блока {block_num}: {str(e)}") + attempts += 1 - block_num = (block_num + 1) % 65536 + if not ack_received: + raise Exception(f"Не удалось получить подтверждение для блока {block_num}") - except Exception as e: - self.log(f"[ERROR] Ошибка при обработке блока {block_num}: {str(e)}") - raise + block_num = (block_num + 1) % 65536 - elapsed_time = time.time() - start_time - self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} " - f"завершена за {elapsed_time:.2f} сек. Всего отправлено {bytes_sent} байт.") + except Exception as e: + if not self.running: + raise Exception("Передача прервана: сервер остановлен") + raise + + if bytes_sent == filesize: + elapsed_time = time.time() - start_time + self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} " + f"завершена за {elapsed_time:.2f} сек. Всего отправлено {bytes_sent} байт.") + except Exception as e: + if not self.running: + self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} прервана: сервер остановлен") + else: + self.log(f"[ERROR] Ошибка при передаче файла '{file_basename}' " + f"клиенту {client_addr}: {str(e)}") + raise except Exception as e: - self.log(f"[ERROR] Ошибка при передаче файла '{os.path.basename(file_path)}' " - f"клиенту {client_addr}: {str(e)}") - try: - self.send_error(client_addr, 0, str(e)) - except: - pass - finally: - if transfer_socket: + if not self.running: + self.log(f"[INFO] Передача файла прервана: сервер остановлен") + else: + self.log(f"[ERROR] Ошибка при передаче файла '{os.path.basename(file_path)}' " + f"клиенту {client_addr}: {str(e)}") try: - transfer_socket.close() + self.send_error(client_addr, 0, str(e)) except: pass + finally: + # Закрываем сокет передачи + if transfer_socket: + try: + with self.lock: + self.transfer_sockets.discard(transfer_socket) + transfer_socket.close() + transfer_socket = None + except: + pass + + # Удаляем информацию о передаче with self.lock: if client_addr in self.active_transfers: del self.active_transfers[client_addr] \ No newline at end of file