- Implement more robust file transfer mechanism with configurable retry and timeout settings - Add detailed logging for transfer progress and error scenarios - Enhance block transfer logic with better error recovery - Simplify transfer socket management and cleanup process - Improve overall transfer reliability and error tracking
349 lines
17 KiB
Python
349 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
TFTP сервер для передачи прошивки с компьютера на коммутатор.
|
||
|
||
- Создает сервер по заданному IP и порту.
|
||
- Расшаривает папку Firmware.
|
||
- Показывает текущее состояние сервера и статус передачи файла:
|
||
- кому (IP устройства),
|
||
- сколько осталось байт,
|
||
- сколько передано байт,
|
||
- время передачи.
|
||
"""
|
||
|
||
import os
|
||
import socket
|
||
import struct
|
||
import threading
|
||
import time
|
||
|
||
class TFTPServer:
|
||
def __init__(self, share_folder):
|
||
"""
|
||
Инициализация TFTP сервера.
|
||
|
||
:param share_folder: Путь к папке, содержащей файлы (например, папка 'Firmware')
|
||
"""
|
||
self.share_folder = share_folder
|
||
self.log_callback = None
|
||
self.running = False
|
||
self.server_socket = None
|
||
self.lock = threading.Lock()
|
||
self.transfer_sockets = set() # Множество для хранения всех активных сокетов передачи
|
||
# Словарь активных передач для мониторинга их статуса.
|
||
# Ключ – адрес клиента, значение – словарь с информацией о передаче.
|
||
self.active_transfers = {}
|
||
|
||
def set_log_callback(self, callback):
|
||
"""
|
||
Установка функции обратного вызова для логирования сообщений.
|
||
|
||
:param callback: Функция, принимающая строку сообщения.
|
||
"""
|
||
self.log_callback = callback
|
||
|
||
def log(self, message):
|
||
"""
|
||
Функция логирования: вызывает callback (если задан) или выводит сообщение в консоль.
|
||
|
||
:param message: Строка с сообщением для логирования.
|
||
"""
|
||
if self.log_callback:
|
||
self.log_callback(message)
|
||
else:
|
||
print(message)
|
||
|
||
def start_server(self, ip, port):
|
||
"""
|
||
Запуск TFTP сервера на указанном IP и порту.
|
||
|
||
:param ip: IP-адрес для привязки сервера.
|
||
:param port: Порт для TFTP сервера.
|
||
"""
|
||
if self.running:
|
||
self.log("[WARN] Сервер уже запущен")
|
||
return
|
||
|
||
self.running = True
|
||
try:
|
||
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
self.server_socket.bind((ip, port))
|
||
self.log(f"[INFO] TFTP сервер запущен на {ip}:{port}")
|
||
|
||
while self.running:
|
||
try:
|
||
self.server_socket.settimeout(1.0)
|
||
data, client_addr = self.server_socket.recvfrom(2048)
|
||
if data and self.running:
|
||
threading.Thread(target=self.handle_request, args=(data, client_addr), daemon=True).start()
|
||
except socket.timeout:
|
||
continue
|
||
except socket.error as e:
|
||
if self.running: # Логируем ошибку только если сервер еще запущен
|
||
self.log(f"[ERROR] Ошибка получения данных: {str(e)}")
|
||
break
|
||
except Exception as e:
|
||
if self.running: # Логируем ошибку только если сервер еще запущен
|
||
self.log(f"[ERROR] Ошибка получения данных: {str(e)}")
|
||
break
|
||
except Exception as e:
|
||
self.log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
|
||
finally:
|
||
self.running = False
|
||
if self.server_socket:
|
||
try:
|
||
self.server_socket.close()
|
||
except:
|
||
pass
|
||
self.server_socket = None
|
||
|
||
def stop_server(self):
|
||
"""
|
||
Остановка TFTP сервера.
|
||
"""
|
||
if not self.running:
|
||
return
|
||
|
||
self.log("[INFO] Остановка TFTP сервера...")
|
||
self.running = False
|
||
|
||
try:
|
||
# Закрываем основной сокет сервера первым
|
||
if self.server_socket:
|
||
try:
|
||
# Создаем временный сокет и отправляем пакет самому себе,
|
||
# чтобы разблокировать recvfrom
|
||
temp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
try:
|
||
server_address = self.server_socket.getsockname()
|
||
temp_socket.sendto(b'', server_address)
|
||
except:
|
||
pass
|
||
finally:
|
||
try:
|
||
temp_socket.close()
|
||
except:
|
||
pass
|
||
|
||
try:
|
||
self.server_socket.close()
|
||
except Exception as e:
|
||
self.log(f"[WARN] Ошибка при закрытии основного сокета: {str(e)}")
|
||
finally:
|
||
self.server_socket = None
|
||
|
||
# Закрываем все активные сокеты передачи
|
||
with self.lock:
|
||
active_sockets = list(self.transfer_sockets)
|
||
self.transfer_sockets.clear()
|
||
active_transfers = dict(self.active_transfers)
|
||
self.active_transfers.clear()
|
||
|
||
# Закрываем сокеты передачи после очистки множества
|
||
for sock in active_sockets:
|
||
try:
|
||
if sock:
|
||
sock.close()
|
||
except Exception as e:
|
||
self.log(f"[WARN] Ошибка при закрытии сокета передачи: {str(e)}")
|
||
|
||
# Отправляем сообщения об остановке для активных передач
|
||
for client_addr, transfer_info in active_transfers.items():
|
||
try:
|
||
self.send_error(client_addr, 0, "Сервер остановлен")
|
||
except:
|
||
pass
|
||
|
||
except Exception as e:
|
||
self.log(f"[ERROR] Ошибка при остановке сервера: {str(e)}")
|
||
finally:
|
||
self.running = False # Гарантируем, что флаг running будет False
|
||
self.log("[INFO] TFTP сервер остановлен")
|
||
|
||
def handle_request(self, data, client_addr):
|
||
"""
|
||
Обработка входящего запроса от клиента.
|
||
|
||
:param data: Полученные данные (UDP-пакет).
|
||
:param client_addr: Адрес клиента, отправившего пакет.
|
||
"""
|
||
if len(data) < 2:
|
||
self.log(f"[WARN] Получен некорректный пакет от {client_addr}")
|
||
return
|
||
opcode = struct.unpack("!H", data[:2])[0]
|
||
if opcode == 1: # RRQ (Read Request) – запрос на чтение файла
|
||
self.handle_rrq(data, client_addr)
|
||
else:
|
||
self.log(f"[WARN] Неподдерживаемый запрос (опкод {opcode}) от {client_addr}")
|
||
|
||
def handle_rrq(self, data, client_addr):
|
||
"""
|
||
Обработка запроса на чтение файла (RRQ).
|
||
|
||
:param data: Данные запроса.
|
||
:param client_addr: Адрес клиента.
|
||
"""
|
||
try:
|
||
# RRQ формата: 2 байта опкода, затем строка имени файла, за которой следует 0,
|
||
# затем строка режима (например, "octet"), и завершается 0.
|
||
parts = data[2:].split(b'\0')
|
||
if len(parts) < 2:
|
||
self.log(f"[WARN] Некорректный RRQ пакет от {client_addr}")
|
||
return
|
||
filename = parts[0].decode('utf-8')
|
||
mode = parts[1].decode('utf-8').lower()
|
||
self.log(f"[INFO] Получен RRQ от {client_addr}: файл '{filename}', режим '{mode}'")
|
||
if mode != "octet":
|
||
self.send_error(client_addr, 0, "Поддерживается только octet режим")
|
||
return
|
||
file_path = os.path.join(self.share_folder, filename)
|
||
if not os.path.isfile(file_path):
|
||
self.send_error(client_addr, 1, "Файл не найден")
|
||
return
|
||
# Запускаем передачу файла в новом потоке.
|
||
threading.Thread(target=self.send_file, args=(file_path, client_addr), daemon=True).start()
|
||
except Exception as e:
|
||
self.log(f"[ERROR] Ошибка обработки RRQ: {str(e)}")
|
||
|
||
def send_error(self, client_addr, error_code, error_message):
|
||
"""
|
||
Отправка сообщения об ошибке клиенту.
|
||
|
||
:param client_addr: Адрес клиента.
|
||
:param error_code: Код ошибки.
|
||
:param error_message: Текст ошибки.
|
||
"""
|
||
# Формируем TFTP пакет ошибки: 2 байта опкода (5), 2 байта кода ошибки, сообщение об ошибке и завершающий 0.
|
||
packet = struct.pack("!HH", 5, error_code) + error_message.encode('utf-8') + b'\0'
|
||
self.server_socket.sendto(packet, client_addr)
|
||
self.log(f"[INFO] Отправлено сообщение об ошибке '{error_message}' клиенту {client_addr}")
|
||
|
||
def send_file(self, file_path, client_addr):
|
||
"""
|
||
Передача файла клиенту по протоколу TFTP.
|
||
"""
|
||
BLOCK_SIZE = 512
|
||
MAX_RETRIES = 5
|
||
TIMEOUT = 2.0
|
||
transfer_socket = None
|
||
try:
|
||
if not os.path.exists(file_path):
|
||
self.log(f"[ERROR] Файл '{file_path}' не существует")
|
||
self.send_error(client_addr, 1, "Файл не найден")
|
||
return
|
||
|
||
filesize = os.path.getsize(file_path)
|
||
if filesize == 0:
|
||
self.log(f"[ERROR] Файл '{file_path}' пуст")
|
||
self.send_error(client_addr, 0, "Файл пуст")
|
||
return
|
||
|
||
start_time = time.time()
|
||
file_basename = os.path.basename(file_path)
|
||
|
||
# Регистрируем активную передачу
|
||
with self.lock:
|
||
self.active_transfers[client_addr] = {
|
||
'filename': file_basename,
|
||
'filesize': filesize,
|
||
'bytes_sent': 0,
|
||
'start_time': start_time
|
||
}
|
||
|
||
# Создаем новый сокет для передачи данных
|
||
transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
transfer_socket.settimeout(TIMEOUT)
|
||
|
||
with self.lock:
|
||
self.transfer_sockets.add(transfer_socket)
|
||
|
||
self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.")
|
||
|
||
with open(file_path, 'rb') as file:
|
||
block_number = 1
|
||
last_successful_block = 0
|
||
|
||
while True:
|
||
# Читаем блок данных
|
||
data = file.read(BLOCK_SIZE)
|
||
|
||
# Формируем и отправляем пакет данных
|
||
packet = struct.pack('!HH', 3, block_number) + data
|
||
|
||
retries = 0
|
||
while retries < MAX_RETRIES:
|
||
try:
|
||
transfer_socket.sendto(packet, client_addr)
|
||
|
||
# Ожидаем подтверждение
|
||
while True:
|
||
try:
|
||
ack_data, ack_addr = transfer_socket.recvfrom(4)
|
||
if ack_addr == client_addr and len(ack_data) >= 4:
|
||
opcode, ack_block = struct.unpack('!HH', ack_data)
|
||
if opcode == 4: # ACK
|
||
if ack_block == block_number:
|
||
# Успешное подтверждение
|
||
last_successful_block = block_number
|
||
bytes_sent = min((block_number * BLOCK_SIZE), filesize)
|
||
|
||
# Обновляем информацию о прогрессе
|
||
with self.lock:
|
||
if client_addr in self.active_transfers:
|
||
self.active_transfers[client_addr]['bytes_sent'] = bytes_sent
|
||
|
||
# Логируем статус каждую секунду
|
||
current_time = time.time()
|
||
if current_time - start_time >= 1.0:
|
||
bytes_remaining = filesize - bytes_sent
|
||
elapsed_time = current_time - start_time
|
||
self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | "
|
||
f"Отправлено: {bytes_sent}/{filesize} байт | "
|
||
f"Осталось: {bytes_remaining} байт | "
|
||
f"Время: {elapsed_time:.2f} сек.")
|
||
|
||
break
|
||
elif ack_block < block_number:
|
||
# Получен старый ACK, игнорируем
|
||
continue
|
||
except socket.timeout:
|
||
break
|
||
|
||
if last_successful_block == block_number:
|
||
break
|
||
else:
|
||
retries += 1
|
||
self.log(f"[WARN] Таймаут ожидания ACK для блока {block_number} от {client_addr}. "
|
||
f"Попытка {retries + 1}.")
|
||
except Exception as e:
|
||
retries += 1
|
||
self.log(f"[ERROR] Ошибка при передаче блока {block_number}: {str(e)}")
|
||
|
||
if retries >= MAX_RETRIES:
|
||
self.log(f"[ERROR] Превышено максимальное количество попыток для блока {block_number}")
|
||
return
|
||
|
||
block_number += 1
|
||
|
||
# Если отправили меньше BLOCK_SIZE байт, это был последний блок
|
||
if len(data) < BLOCK_SIZE:
|
||
break
|
||
|
||
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} завершена успешно")
|
||
|
||
except Exception as e:
|
||
self.log(f"[ERROR] Ошибка при передаче файла: {str(e)}")
|
||
finally:
|
||
# Очищаем информацию о передаче
|
||
with self.lock:
|
||
if client_addr in self.active_transfers:
|
||
del self.active_transfers[client_addr]
|
||
if transfer_socket in self.transfer_sockets:
|
||
self.transfer_sockets.remove(transfer_socket)
|
||
|
||
if transfer_socket:
|
||
try:
|
||
transfer_socket.close()
|
||
except:
|
||
pass |