ComConfigCopy/TFTPServer.py
Lowa 136c7877d3 Refactor TFTP file transfer with improved reliability and error handling
- Implement more robust file transfer mechanism with configurable retry and timeout settings
- Add detailed logging for transfer progress and error scenarios
- Enhance block transfer logic with better error recovery
- Simplify transfer socket management and cleanup process
- Improve overall transfer reliability and error tracking
2025-02-16 03:50:27 +03:00

349 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TFTP сервер для передачи прошивки с компьютера на коммутатор.
- Создает сервер по заданному IP и порту.
- Расшаривает папку Firmware.
- Показывает текущее состояние сервера и статус передачи файла:
- кому (IP устройства),
- сколько осталось байт,
- сколько передано байт,
- время передачи.
"""
import os
import socket
import struct
import threading
import time
class TFTPServer:
def __init__(self, share_folder):
"""
Инициализация TFTP сервера.
:param share_folder: Путь к папке, содержащей файлы (например, папка 'Firmware')
"""
self.share_folder = share_folder
self.log_callback = None
self.running = False
self.server_socket = None
self.lock = threading.Lock()
self.transfer_sockets = set() # Множество для хранения всех активных сокетов передачи
# Словарь активных передач для мониторинга их статуса.
# Ключ адрес клиента, значение словарь с информацией о передаче.
self.active_transfers = {}
def set_log_callback(self, callback):
"""
Установка функции обратного вызова для логирования сообщений.
:param callback: Функция, принимающая строку сообщения.
"""
self.log_callback = callback
def log(self, message):
"""
Функция логирования: вызывает callback (если задан) или выводит сообщение в консоль.
:param message: Строка с сообщением для логирования.
"""
if self.log_callback:
self.log_callback(message)
else:
print(message)
def start_server(self, ip, port):
"""
Запуск TFTP сервера на указанном IP и порту.
:param ip: IP-адрес для привязки сервера.
:param port: Порт для TFTP сервера.
"""
if self.running:
self.log("[WARN] Сервер уже запущен")
return
self.running = True
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.bind((ip, port))
self.log(f"[INFO] TFTP сервер запущен на {ip}:{port}")
while self.running:
try:
self.server_socket.settimeout(1.0)
data, client_addr = self.server_socket.recvfrom(2048)
if data and self.running:
threading.Thread(target=self.handle_request, args=(data, client_addr), daemon=True).start()
except socket.timeout:
continue
except socket.error as e:
if self.running: # Логируем ошибку только если сервер еще запущен
self.log(f"[ERROR] Ошибка получения данных: {str(e)}")
break
except Exception as e:
if self.running: # Логируем ошибку только если сервер еще запущен
self.log(f"[ERROR] Ошибка получения данных: {str(e)}")
break
except Exception as e:
self.log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
finally:
self.running = False
if self.server_socket:
try:
self.server_socket.close()
except:
pass
self.server_socket = None
def stop_server(self):
"""
Остановка TFTP сервера.
"""
if not self.running:
return
self.log("[INFO] Остановка TFTP сервера...")
self.running = False
try:
# Закрываем основной сокет сервера первым
if self.server_socket:
try:
# Создаем временный сокет и отправляем пакет самому себе,
# чтобы разблокировать recvfrom
temp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
server_address = self.server_socket.getsockname()
temp_socket.sendto(b'', server_address)
except:
pass
finally:
try:
temp_socket.close()
except:
pass
try:
self.server_socket.close()
except Exception as e:
self.log(f"[WARN] Ошибка при закрытии основного сокета: {str(e)}")
finally:
self.server_socket = None
# Закрываем все активные сокеты передачи
with self.lock:
active_sockets = list(self.transfer_sockets)
self.transfer_sockets.clear()
active_transfers = dict(self.active_transfers)
self.active_transfers.clear()
# Закрываем сокеты передачи после очистки множества
for sock in active_sockets:
try:
if sock:
sock.close()
except Exception as e:
self.log(f"[WARN] Ошибка при закрытии сокета передачи: {str(e)}")
# Отправляем сообщения об остановке для активных передач
for client_addr, transfer_info in active_transfers.items():
try:
self.send_error(client_addr, 0, "Сервер остановлен")
except:
pass
except Exception as e:
self.log(f"[ERROR] Ошибка при остановке сервера: {str(e)}")
finally:
self.running = False # Гарантируем, что флаг running будет False
self.log("[INFO] TFTP сервер остановлен")
def handle_request(self, data, client_addr):
"""
Обработка входящего запроса от клиента.
:param data: Полученные данные (UDP-пакет).
:param client_addr: Адрес клиента, отправившего пакет.
"""
if len(data) < 2:
self.log(f"[WARN] Получен некорректный пакет от {client_addr}")
return
opcode = struct.unpack("!H", data[:2])[0]
if opcode == 1: # RRQ (Read Request) запрос на чтение файла
self.handle_rrq(data, client_addr)
else:
self.log(f"[WARN] Неподдерживаемый запрос (опкод {opcode}) от {client_addr}")
def handle_rrq(self, data, client_addr):
"""
Обработка запроса на чтение файла (RRQ).
:param data: Данные запроса.
:param client_addr: Адрес клиента.
"""
try:
# RRQ формата: 2 байта опкода, затем строка имени файла, за которой следует 0,
# затем строка режима (например, "octet"), и завершается 0.
parts = data[2:].split(b'\0')
if len(parts) < 2:
self.log(f"[WARN] Некорректный RRQ пакет от {client_addr}")
return
filename = parts[0].decode('utf-8')
mode = parts[1].decode('utf-8').lower()
self.log(f"[INFO] Получен RRQ от {client_addr}: файл '{filename}', режим '{mode}'")
if mode != "octet":
self.send_error(client_addr, 0, "Поддерживается только octet режим")
return
file_path = os.path.join(self.share_folder, filename)
if not os.path.isfile(file_path):
self.send_error(client_addr, 1, "Файл не найден")
return
# Запускаем передачу файла в новом потоке.
threading.Thread(target=self.send_file, args=(file_path, client_addr), daemon=True).start()
except Exception as e:
self.log(f"[ERROR] Ошибка обработки RRQ: {str(e)}")
def send_error(self, client_addr, error_code, error_message):
"""
Отправка сообщения об ошибке клиенту.
:param client_addr: Адрес клиента.
:param error_code: Код ошибки.
:param error_message: Текст ошибки.
"""
# Формируем TFTP пакет ошибки: 2 байта опкода (5), 2 байта кода ошибки, сообщение об ошибке и завершающий 0.
packet = struct.pack("!HH", 5, error_code) + error_message.encode('utf-8') + b'\0'
self.server_socket.sendto(packet, client_addr)
self.log(f"[INFO] Отправлено сообщение об ошибке '{error_message}' клиенту {client_addr}")
def send_file(self, file_path, client_addr):
"""
Передача файла клиенту по протоколу TFTP.
"""
BLOCK_SIZE = 512
MAX_RETRIES = 5
TIMEOUT = 2.0
transfer_socket = None
try:
if not os.path.exists(file_path):
self.log(f"[ERROR] Файл '{file_path}' не существует")
self.send_error(client_addr, 1, "Файл не найден")
return
filesize = os.path.getsize(file_path)
if filesize == 0:
self.log(f"[ERROR] Файл '{file_path}' пуст")
self.send_error(client_addr, 0, "Файл пуст")
return
start_time = time.time()
file_basename = os.path.basename(file_path)
# Регистрируем активную передачу
with self.lock:
self.active_transfers[client_addr] = {
'filename': file_basename,
'filesize': filesize,
'bytes_sent': 0,
'start_time': start_time
}
# Создаем новый сокет для передачи данных
transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
transfer_socket.settimeout(TIMEOUT)
with self.lock:
self.transfer_sockets.add(transfer_socket)
self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.")
with open(file_path, 'rb') as file:
block_number = 1
last_successful_block = 0
while True:
# Читаем блок данных
data = file.read(BLOCK_SIZE)
# Формируем и отправляем пакет данных
packet = struct.pack('!HH', 3, block_number) + data
retries = 0
while retries < MAX_RETRIES:
try:
transfer_socket.sendto(packet, client_addr)
# Ожидаем подтверждение
while True:
try:
ack_data, ack_addr = transfer_socket.recvfrom(4)
if ack_addr == client_addr and len(ack_data) >= 4:
opcode, ack_block = struct.unpack('!HH', ack_data)
if opcode == 4: # ACK
if ack_block == block_number:
# Успешное подтверждение
last_successful_block = block_number
bytes_sent = min((block_number * BLOCK_SIZE), filesize)
# Обновляем информацию о прогрессе
with self.lock:
if client_addr in self.active_transfers:
self.active_transfers[client_addr]['bytes_sent'] = bytes_sent
# Логируем статус каждую секунду
current_time = time.time()
if current_time - start_time >= 1.0:
bytes_remaining = filesize - bytes_sent
elapsed_time = current_time - start_time
self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | "
f"Отправлено: {bytes_sent}/{filesize} байт | "
f"Осталось: {bytes_remaining} байт | "
f"Время: {elapsed_time:.2f} сек.")
break
elif ack_block < block_number:
# Получен старый ACK, игнорируем
continue
except socket.timeout:
break
if last_successful_block == block_number:
break
else:
retries += 1
self.log(f"[WARN] Таймаут ожидания ACK для блока {block_number} от {client_addr}. "
f"Попытка {retries + 1}.")
except Exception as e:
retries += 1
self.log(f"[ERROR] Ошибка при передаче блока {block_number}: {str(e)}")
if retries >= MAX_RETRIES:
self.log(f"[ERROR] Превышено максимальное количество попыток для блока {block_number}")
return
block_number += 1
# Если отправили меньше BLOCK_SIZE байт, это был последний блок
if len(data) < BLOCK_SIZE:
break
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} завершена успешно")
except Exception as e:
self.log(f"[ERROR] Ошибка при передаче файла: {str(e)}")
finally:
# Очищаем информацию о передаче
with self.lock:
if client_addr in self.active_transfers:
del self.active_transfers[client_addr]
if transfer_socket in self.transfer_sockets:
self.transfer_sockets.remove(transfer_socket)
if transfer_socket:
try:
transfer_socket.close()
except:
pass