Files
ComConfigCopy/TFTPServer.py
Lowa 299ce329f7 Add TFTP server functionality to the application
- Implement TFTP server tab with IP and port configuration
- Create methods to start and stop TFTP server
- Add logging functionality for TFTP server events
- Integrate TFTPServer class into the main application
- Re-enable Firmware directory creation
2025-02-16 02:51:47 +03:00

178 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import tftpy
import logging
from typing import Optional, Callable, Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class FileTransfer:
filename: str
client_address: tuple
start_time: datetime
total_blocks: int = 0
current_block: int = 0
status: str = "в процессе"
class TFTPServer:
def __init__(self, root_path: str):
"""
Инициализация TFTP сервера
Args:
root_path (str): Путь к корневой директории для файлов
"""
self.root_path = root_path
self.server: Optional[tftpy.TftpServer] = None
self.logger = logging.getLogger(__name__)
self.log_callback: Optional[Callable[[str], None]] = None
self.progress_callback: Optional[Callable[[str, tuple, int, int, str], None]] = None
self.active_transfers: Dict[str, FileTransfer] = {}
def set_log_callback(self, callback: Callable[[str], None]):
"""Установка функции обратного вызова для логирования"""
self.log_callback = callback
def set_progress_callback(self, callback: Callable[[str, tuple, int, int, str], None]):
"""Установка функции обратного вызова для отображения прогресса"""
self.progress_callback = callback
def log_message(self, message: str, level: str = "INFO"):
"""Логирование сообщения"""
if self.log_callback:
self.log_callback(f"[{level}] {message}")
if level == "INFO":
self.logger.info(message)
elif level == "ERROR":
self.logger.error(message)
elif level == "WARNING":
self.logger.warning(message)
def update_progress(self, transfer: FileTransfer):
"""Обновление прогресса передачи файла"""
if self.progress_callback:
self.progress_callback(
transfer.filename,
transfer.client_address,
transfer.current_block,
transfer.total_blocks,
transfer.status
)
def start_server(self, ip: str = "0.0.0.0", port: int = 69):
"""
Запуск TFTP сервера
Args:
ip (str): IP адрес для прослушивания (по умолчанию "0.0.0.0")
port (int): Порт для прослушивания (по умолчанию 69)
"""
try:
if not os.path.exists(self.root_path):
os.makedirs(self.root_path)
# Создаем серверный класс с обработчиками событий
server = tftpy.TftpServer(self.root_path)
# Добавляем обработчики событий
def on_read_request(filename: str, client_address: tuple):
self.log_message(f"Получен запрос на скачивание файла '{filename}' от {client_address[0]}:{client_address[1]}")
file_path = os.path.join(self.root_path, filename)
if not os.path.exists(file_path):
self.log_message(f"Файл '{filename}' не найден", "ERROR")
return False
# Создаем запись о передаче файла
file_size = os.path.getsize(file_path)
total_blocks = (file_size + 511) // 512 # Размер блока TFTP = 512 байт
transfer = FileTransfer(
filename=filename,
client_address=client_address,
start_time=datetime.now(),
total_blocks=total_blocks
)
self.active_transfers[f"{filename}_{client_address}"] = transfer
self.update_progress(transfer)
return True
def on_write_request(filename: str, client_address: tuple):
self.log_message(f"Получен запрос на загрузку файла '{filename}' от {client_address[0]}:{client_address[1]}")
transfer = FileTransfer(
filename=filename,
client_address=client_address,
start_time=datetime.now()
)
self.active_transfers[f"{filename}_{client_address}"] = transfer
self.update_progress(transfer)
return True
def on_read_block_sent(filename: str, block_number: int, client_address: tuple):
key = f"{filename}_{client_address}"
if key in self.active_transfers:
transfer = self.active_transfers[key]
transfer.current_block = block_number
if transfer.current_block >= transfer.total_blocks:
transfer.status = "завершено"
del self.active_transfers[key]
self.update_progress(transfer)
self.log_message(f"Отправлен блок {block_number} файла '{filename}' клиенту {client_address[0]}:{client_address[1]}")
def on_write_block_received(filename: str, block_number: int, client_address: tuple):
key = f"{filename}_{client_address}"
if key in self.active_transfers:
transfer = self.active_transfers[key]
transfer.current_block = block_number
if block_number == 1: # Первый блок
file_path = os.path.join(self.root_path, filename)
if os.path.exists(file_path):
transfer.total_blocks = (os.path.getsize(file_path) + 511) // 512
if transfer.current_block >= transfer.total_blocks:
transfer.status = "завершено"
del self.active_transfers[key]
self.update_progress(transfer)
self.log_message(f"Получен блок {block_number} файла '{filename}' от клиента {client_address[0]}:{client_address[1]}")
def on_error(error: Exception, client_address: tuple):
# Помечаем все активные передачи для этого клиента как ошибочные
for key, transfer in list(self.active_transfers.items()):
if transfer.client_address == client_address:
transfer.status = f"ошибка: {str(error)}"
self.update_progress(transfer)
del self.active_transfers[key]
self.log_message(f"Ошибка при обработке запроса от {client_address[0]}:{client_address[1]}: {str(error)}", "ERROR")
# Устанавливаем обработчики
server.on_read_request = on_read_request
server.on_write_request = on_write_request
server.on_read_block_sent = on_read_block_sent
server.on_write_block_received = on_write_block_received
server.on_error = on_error
self.server = server
self.log_message(f"Запуск TFTP сервера на {ip}:{port}")
self.server.listen(ip, port)
except Exception as e:
error_msg = f"Ошибка при запуске TFTP сервера: {str(e)}"
self.log_message(error_msg, "ERROR")
raise
def stop_server(self):
"""Остановка TFTP сервера"""
if self.server:
try:
# Помечаем все активные передачи как прерванные
for transfer in self.active_transfers.values():
transfer.status = "прервано"
self.update_progress(transfer)
self.active_transfers.clear()
self.server.stop()
self.log_message("TFTP сервер остановлен")
except Exception as e:
error_msg = f"Ошибка при остановке TFTP сервера: {str(e)}"
self.log_message(error_msg, "ERROR")
raise