- Refactor UpdateChecker - Add support for parsing release details with improved formatting - Implement more robust version comparison and release type handling - Add logging for update checking process - Improve error handling and release information extraction - Update update checking logic to handle stable and pre-release versions
1288 lines
60 KiB
Python
1288 lines
60 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
# ------------------------------------------------------------
|
||
# Это программа для копирования конфигураций на коммутаторы
|
||
# ------------------------------------------------------------
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import sys
|
||
import threading
|
||
import time
|
||
import webbrowser
|
||
from getpass import getpass
|
||
from logging.handlers import RotatingFileHandler
|
||
import tkinter as tk
|
||
from tkinter import (
|
||
StringVar,
|
||
END,
|
||
BOTH,
|
||
LEFT,
|
||
X,
|
||
W,
|
||
filedialog,
|
||
messagebox,
|
||
simpledialog,
|
||
)
|
||
from tkinter import ttk
|
||
|
||
import serial
|
||
import serial.tools.list_ports
|
||
from serial.serialutil import SerialException
|
||
from about_window import AboutWindow
|
||
from TFTPServer import TFTPServer
|
||
import socket
|
||
from update_checker import UpdateChecker
|
||
|
||
# Версия программы
|
||
VERSION = "1.0.1"
|
||
|
||
# Создаем необходимые папки
|
||
os.makedirs("Logs", exist_ok=True)
|
||
os.makedirs("Configs", exist_ok=True)
|
||
os.makedirs("Settings", exist_ok=True)
|
||
os.makedirs("Firmware", exist_ok=True)
|
||
os.makedirs("docs", exist_ok=True)
|
||
|
||
# Файл настроек находится в папке Settings
|
||
SETTINGS_FILE = os.path.join("Settings", "settings.json")
|
||
|
||
# Настройка логирования с использованием RotatingFileHandler.
|
||
def setup_logging():
|
||
logger = logging.getLogger()
|
||
logger.setLevel(logging.DEBUG)
|
||
log_path = os.path.join("Logs", "app.log")
|
||
handler = RotatingFileHandler(
|
||
log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8"
|
||
)
|
||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
||
handler.setFormatter(formatter)
|
||
logger.addHandler(handler)
|
||
|
||
# Загрузка настроек из JSON-файла или создание настроек по умолчанию.
|
||
def settings_load():
|
||
default_settings = {
|
||
"port": None, # Порт для подключения
|
||
"baudrate": 9600, # Скорость передачи данных
|
||
"config_file": None, # Файл конфигурации
|
||
"login": None, # Логин для подключения
|
||
"password": None, # Пароль для подключения
|
||
"timeout": 10, # Таймаут подключения
|
||
"copy_mode": "line", # Режим копирования
|
||
"block_size": 15, # Размер блока команд
|
||
"prompt": ">", # Используется для определения приглашения
|
||
}
|
||
|
||
# Создаем папку Settings, если её нет
|
||
os.makedirs("Settings", exist_ok=True)
|
||
|
||
if not os.path.exists(SETTINGS_FILE):
|
||
try:
|
||
# При первом запуске создаем новый файл настроек
|
||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(default_settings, f, indent=4, ensure_ascii=False)
|
||
logging.info("Файл настроек создан с настройками по умолчанию.")
|
||
return default_settings
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при создании файла настроек: {e}", exc_info=True)
|
||
return default_settings
|
||
|
||
try:
|
||
# Пытаемся загрузить существующие настройки
|
||
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
|
||
settings = json.load(f)
|
||
|
||
# Проверяем наличие всех необходимых параметров
|
||
settings_changed = False
|
||
for key, value in default_settings.items():
|
||
if key not in settings:
|
||
settings[key] = value
|
||
settings_changed = True
|
||
|
||
# Если были добавлены новые параметры, сохраняем обновленные настройки
|
||
if settings_changed:
|
||
try:
|
||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(settings, f, indent=4, ensure_ascii=False)
|
||
logging.info("Файл настроек обновлен с новыми параметрами.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при обновлении файла настроек: {e}", exc_info=True)
|
||
|
||
return settings
|
||
except Exception as e:
|
||
logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True)
|
||
return default_settings
|
||
|
||
# Сохранение настроек в JSON-файл
|
||
def settings_save(settings):
|
||
try:
|
||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(settings, f, indent=4, ensure_ascii=False)
|
||
logging.info("Настройки сохранены в файл.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True)
|
||
|
||
# Получение списка доступных последовательных портов
|
||
def list_serial_ports():
|
||
"""Получение списка доступных последовательных портов."""
|
||
ports = serial.tools.list_ports.comports()
|
||
logging.debug(f"Найдено {len(ports)} серийных портов.")
|
||
return [port.device for port in ports]
|
||
|
||
# Получение списка IP-адресов из сетевых адаптеров
|
||
def get_network_adapters():
|
||
adapters = []
|
||
try:
|
||
# Получаем имя хоста
|
||
hostname = socket.gethostname()
|
||
# Получаем все адреса для данного хоста
|
||
addresses = socket.getaddrinfo(hostname, None)
|
||
|
||
# Создаем множество для хранения уникальных IP-адресов
|
||
unique_ips = set()
|
||
|
||
for addr in addresses:
|
||
ip = addr[4][0]
|
||
# Пропускаем IPv6 и локальные адреса
|
||
if ':' not in ip and not ip.startswith('127.'):
|
||
unique_ips.add(ip)
|
||
|
||
# Добавляем все найденные IP-адреса в список
|
||
for ip in sorted(unique_ips):
|
||
adapters.append(f"{ip}")
|
||
|
||
# Добавляем 0.0.0.0 для прослушивания всех интерфейсов
|
||
adapters.insert(0, "0.0.0.0")
|
||
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при получении списка сетевых адаптеров: {e}", exc_info=True)
|
||
# В случае ошибки возвращаем хотя бы 0.0.0.0
|
||
adapters = ["0.0.0.0"]
|
||
|
||
return adapters
|
||
|
||
# Создание соединения с устройством через последовательный порт
|
||
def create_connection(settings):
|
||
try:
|
||
conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1)
|
||
logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.")
|
||
time.sleep(1)
|
||
return conn
|
||
except SerialException as e:
|
||
logging.error(f"Ошибка подключения: {e}", exc_info=True)
|
||
except Exception as e:
|
||
logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True)
|
||
return None
|
||
|
||
# Проверка наличия логина и пароля в настройках и отправка их на устройство
|
||
def send_login_password(serial_connection, login=None, password=None, is_gui=False):
|
||
if not login:
|
||
if is_gui:
|
||
login = simpledialog.askstring("Login", "Введите логин:")
|
||
if login is None:
|
||
login = ""
|
||
else:
|
||
login = input("Введите логин: ")
|
||
|
||
if not password:
|
||
if is_gui:
|
||
password = simpledialog.askstring("Password", "Введите пароль:", show="*")
|
||
if password is None:
|
||
password = ""
|
||
else:
|
||
password = getpass("Введите пароль: ")
|
||
|
||
# Чтение ответа от устройства с учётом таймаута.
|
||
def read_response(serial_connection, timeout, login=None, password=None, is_gui=False):
|
||
response = b""
|
||
end_time = time.time() + timeout
|
||
decoded = ""
|
||
while time.time() < end_time:
|
||
if serial_connection.in_waiting:
|
||
chunk = serial_connection.read(serial_connection.in_waiting)
|
||
response += chunk
|
||
|
||
if b"--More--" in response:
|
||
serial_connection.write(b"\n")
|
||
response = response.replace(b"--More--", b"")
|
||
|
||
try:
|
||
decoded = response.decode(errors="ignore")
|
||
except Exception:
|
||
decoded = ""
|
||
lines = decoded.rstrip().splitlines()
|
||
if lines:
|
||
last_line = lines[-1].strip()
|
||
|
||
if re.search(r'(login:|username:)$', last_line, re.IGNORECASE):
|
||
send_login_password(serial_connection, login, None, is_gui)
|
||
response = b""
|
||
continue
|
||
|
||
if re.search(r'(password:)$', last_line, re.IGNORECASE):
|
||
send_login_password(serial_connection, None, password, is_gui)
|
||
response = b""
|
||
continue
|
||
|
||
if last_line.endswith(">") or last_line.endswith("#"):
|
||
break
|
||
else:
|
||
time.sleep(0.1)
|
||
return decoded
|
||
|
||
# Генерация блоков команд для блочного копирования
|
||
def generate_command_blocks(lines, block_size):
|
||
blocks = []
|
||
current_block = []
|
||
for line in lines:
|
||
trimmed = line.strip()
|
||
if not trimmed:
|
||
continue
|
||
lower_line = trimmed.lower()
|
||
if lower_line.startswith("vlan") or lower_line.startswith("enable") or lower_line.startswith("interface"):
|
||
if current_block:
|
||
blocks.append("\n".join(current_block))
|
||
current_block = []
|
||
blocks.append(trimmed)
|
||
elif lower_line.startswith("exit"):
|
||
current_block.append(trimmed)
|
||
blocks.append("\n".join(current_block))
|
||
current_block = []
|
||
else:
|
||
current_block.append(trimmed)
|
||
if len(current_block) >= block_size:
|
||
blocks.append("\n".join(current_block))
|
||
current_block = []
|
||
if current_block:
|
||
blocks.append("\n".join(current_block))
|
||
return blocks
|
||
|
||
# Выполнение команд из файла конфигурации
|
||
def execute_commands_from_file(
|
||
serial_connection,
|
||
filename,
|
||
timeout,
|
||
copy_mode,
|
||
block_size,
|
||
log_callback=None,
|
||
login=None,
|
||
password=None,
|
||
is_gui=False,
|
||
):
|
||
try:
|
||
with open(filename, "r", encoding="utf-8") as file:
|
||
lines = [line for line in file if line.strip()]
|
||
msg = f"Выполнение команд из файла: {filename}\n"
|
||
logging.info(msg)
|
||
if log_callback:
|
||
log_callback(msg)
|
||
# Если выбран построчный режим
|
||
if copy_mode == "line":
|
||
for cmd in lines:
|
||
cmd = cmd.strip()
|
||
max_attempts = 3
|
||
attempt = 0
|
||
while attempt < max_attempts:
|
||
msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
serial_connection.write((cmd + "\n").encode())
|
||
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
|
||
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||
if response:
|
||
if '^' in response:
|
||
msg = (
|
||
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
|
||
f"Ответ устройства:\n{response}\n"
|
||
f"Повторная отправка команды...\n"
|
||
)
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
|
||
attempt += 1
|
||
time.sleep(1)
|
||
continue
|
||
else:
|
||
msg = f"Ответ устройства:\n{response}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.info(f"Ответ устройства:\n{response}")
|
||
break
|
||
else:
|
||
msg = f"Ответ не получен для команды: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning(f"Нет ответа для команды: {cmd}")
|
||
break
|
||
if attempt == max_attempts:
|
||
msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.error(msg)
|
||
time.sleep(1)
|
||
# Если выбран блочный режим
|
||
elif copy_mode == "block":
|
||
blocks = generate_command_blocks(lines, block_size)
|
||
for block in blocks:
|
||
msg = f"\nОтправка блока команд:\n{block}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
serial_connection.write((block + "\n").encode())
|
||
logging.info(f"Отправлен блок команд:\n{block}")
|
||
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||
# Если обнаружена ошибка в ответе на блок, отправляем команды по очереди
|
||
if response and '^' in response:
|
||
msg = (
|
||
f"[WARNING] Обнаружена ошибка при выполнении блока команд.\n"
|
||
f"Ответ устройства:\n{response}\n"
|
||
f"Пересылаются команды по отдельности...\n"
|
||
)
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning("Ошибка в блочном режиме – отправляем команды индивидуально.")
|
||
for line in block.splitlines():
|
||
cmd = line.strip()
|
||
if not cmd:
|
||
continue
|
||
max_attempts = 3
|
||
attempt = 0
|
||
while attempt < max_attempts:
|
||
sub_msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
|
||
if log_callback:
|
||
log_callback(sub_msg)
|
||
serial_connection.write((cmd + "\n").encode())
|
||
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
|
||
sub_response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||
if sub_response:
|
||
if '^' in sub_response:
|
||
sub_msg = (
|
||
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
|
||
f"Ответ устройства:\n{sub_response}\n"
|
||
f"Повторная отправка команды...\n"
|
||
)
|
||
if log_callback:
|
||
log_callback(sub_msg)
|
||
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
|
||
attempt += 1
|
||
time.sleep(1)
|
||
continue
|
||
else:
|
||
sub_msg = f"Ответ устройства:\n{sub_response}\n"
|
||
if log_callback:
|
||
log_callback(sub_msg)
|
||
logging.info(f"Ответ устройства:\n{sub_response}")
|
||
break
|
||
else:
|
||
sub_msg = f"Ответ не получен для команды: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(sub_msg)
|
||
logging.warning(f"Нет ответа для команды: {cmd}")
|
||
break
|
||
if attempt == max_attempts:
|
||
sub_msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(sub_msg)
|
||
logging.error(sub_msg)
|
||
time.sleep(1)
|
||
else:
|
||
if response:
|
||
msg = f"Ответ устройства:\n{response}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.info(f"Ответ устройства:\n{response}")
|
||
else:
|
||
msg = f"Ответ не получен для блока:\n{block}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning(f"Нет ответа для блока:\n{block}")
|
||
time.sleep(1)
|
||
except SerialException as e:
|
||
msg = f"Ошибка при выполнении команды: {e}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.error(f"Ошибка при выполнении команды: {e}", exc_info=True)
|
||
except Exception as e:
|
||
msg = f"Ошибка при выполнении команд: {e}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
|
||
|
||
# Базовый класс для кастомных виджетов с поддержкой контекстного меню и горячих клавиш
|
||
class CustomWidgetBase:
|
||
|
||
def __init__(self):
|
||
self.create_context_menu()
|
||
self.bind_shortcuts()
|
||
|
||
# Создание контекстного меню
|
||
def create_context_menu(self):
|
||
"""Создание контекстного меню"""
|
||
self.context_menu = tk.Menu(self, tearoff=0)
|
||
self.context_menu.add_command(label="Вырезать", command=self.cut)
|
||
self.context_menu.add_command(label="Копировать", command=self.copy)
|
||
self.context_menu.add_command(label="Вставить", command=self.paste)
|
||
self.context_menu.add_separator()
|
||
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
|
||
|
||
self.bind("<Button-3>", self.show_context_menu)
|
||
|
||
# Привязка горячих клавиш
|
||
def bind_shortcuts(self):
|
||
"""Привязка горячих клавиш"""
|
||
# Стандартные сочетания
|
||
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
|
||
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
|
||
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
|
||
self.bind("<Control-a>", self.select_all)
|
||
|
||
# Дополнительные сочетания
|
||
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
|
||
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
|
||
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
|
||
|
||
# Отображение контекстного меню
|
||
def show_context_menu(self, event):
|
||
"""Отображение контекстного меню"""
|
||
self.context_menu.post(event.x_root, event.y_root)
|
||
|
||
# Вырезание текста
|
||
def cut(self):
|
||
self.event_generate("<<Cut>>")
|
||
|
||
# Копирование текста
|
||
def copy(self):
|
||
self.event_generate("<<Copy>>")
|
||
|
||
# Вставка текста
|
||
def paste(self):
|
||
self.event_generate("<<Paste>>")
|
||
|
||
# Класс для текстового поля с поддержкой контекстного меню и горячих клавиш
|
||
class CustomText(tk.Text, CustomWidgetBase):
|
||
def __init__(self, *args, **kwargs):
|
||
tk.Text.__init__(self, *args, **kwargs)
|
||
CustomWidgetBase.__init__(self)
|
||
|
||
def select_all(self, event=None):
|
||
"""Выделение всего текста в Text виджете"""
|
||
self.tag_add(tk.SEL, "1.0", tk.END)
|
||
self.mark_set(tk.INSERT, "1.0")
|
||
self.see(tk.INSERT)
|
||
return "break"
|
||
|
||
# Класс для поля ввода с поддержкой контекстного меню и горячих клавиш
|
||
class CustomEntry(ttk.Entry, CustomWidgetBase):
|
||
def __init__(self, *args, **kwargs):
|
||
ttk.Entry.__init__(self, *args, **kwargs)
|
||
CustomWidgetBase.__init__(self)
|
||
|
||
def select_all(self, event=None):
|
||
"""Выделение всего текста в Entry виджете"""
|
||
self.select_range(0, tk.END)
|
||
return "break"
|
||
|
||
# Класс для окна настроек
|
||
class SettingsWindow(tk.Toplevel):
|
||
def __init__(self, parent, settings, callback=None):
|
||
super().__init__(parent)
|
||
self.title("Настройки")
|
||
self.geometry("600x400")
|
||
self.settings = settings
|
||
self.callback = callback
|
||
self.resizable(False, False)
|
||
|
||
# Создаем фрейм для настроек
|
||
settings_frame = ttk.Frame(self, padding="10")
|
||
settings_frame.pack(fill=BOTH, expand=True)
|
||
|
||
# COM порт
|
||
ttk.Label(settings_frame, text="COM порт:").grid(row=0, column=0, sticky=W, pady=5)
|
||
self.port_var = StringVar(value=settings.get("port", ""))
|
||
self.port_combo = ttk.Combobox(settings_frame, textvariable=self.port_var)
|
||
self.port_combo.grid(row=0, column=1, sticky=W, pady=5)
|
||
ttk.Button(settings_frame, text="Обновить", command=self.update_ports).grid(row=0, column=2, padx=5)
|
||
|
||
# Скорость передачи
|
||
ttk.Label(settings_frame, text="Скорость:").grid(row=1, column=0, sticky=W, pady=5)
|
||
self.baudrate_var = StringVar(value=str(settings.get("baudrate", 9600)))
|
||
baudrate_combo = ttk.Combobox(settings_frame, textvariable=self.baudrate_var,
|
||
values=["9600", "19200", "38400", "57600", "115200"])
|
||
baudrate_combo.grid(row=1, column=1, sticky=W, pady=5)
|
||
|
||
# Таймаут
|
||
ttk.Label(settings_frame, text="Таймаут (сек):").grid(row=2, column=0, sticky=W, pady=5)
|
||
self.timeout_var = StringVar(value=str(settings.get("timeout", 10)))
|
||
timeout_entry = ttk.Entry(settings_frame, textvariable=self.timeout_var)
|
||
timeout_entry.grid(row=2, column=1, sticky=W, pady=5)
|
||
|
||
# Режим копирования
|
||
ttk.Label(settings_frame, text="Режим копирования:").grid(row=3, column=0, sticky=W, pady=5)
|
||
self.copy_mode_var = StringVar(value=settings.get("copy_mode", "line"))
|
||
copy_mode_frame = ttk.Frame(settings_frame)
|
||
copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5)
|
||
ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line",
|
||
variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
|
||
ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block",
|
||
variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
|
||
|
||
# Размер блока
|
||
self.block_size_label = ttk.Label(settings_frame, text="Размер блока:")
|
||
self.block_size_label.grid(row=4, column=0, sticky=W, pady=5)
|
||
self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
|
||
self.block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var)
|
||
self.block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
|
||
|
||
# Приглашение командной строки
|
||
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
|
||
self.prompt_var = StringVar(value=settings.get("prompt", ">"))
|
||
prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var)
|
||
prompt_entry.grid(row=5, column=1, sticky=W, pady=5)
|
||
|
||
# Кнопки
|
||
button_frame = ttk.Frame(settings_frame)
|
||
button_frame.grid(row=6, column=0, columnspan=3, pady=20)
|
||
ttk.Button(button_frame, text="Сохранить", command=self.save_settings).pack(side=LEFT, padx=5)
|
||
ttk.Button(button_frame, text="Отмена", command=self.destroy).pack(side=LEFT, padx=5)
|
||
|
||
self.update_ports()
|
||
|
||
# Инициализация видимости поля размера блока
|
||
self.toggle_block_size()
|
||
|
||
# Центрируем окно
|
||
self.center_window()
|
||
|
||
# Переключение видимости поля размера блока
|
||
def toggle_block_size(self):
|
||
if self.copy_mode_var.get() == "line":
|
||
self.block_size_label.grid_remove()
|
||
self.block_size_entry.grid_remove()
|
||
else:
|
||
self.block_size_label.grid()
|
||
self.block_size_entry.grid()
|
||
|
||
# Центрирование окна
|
||
def center_window(self):
|
||
self.update_idletasks()
|
||
width = self.winfo_width()
|
||
height = self.winfo_height()
|
||
x = (self.winfo_screenwidth() // 2) - (width // 2)
|
||
y = (self.winfo_screenheight() // 2) - (height // 2)
|
||
self.geometry(f"{width}x{height}+{x}+{y}")
|
||
|
||
# Обновление списка доступных последовательных портов
|
||
def update_ports(self):
|
||
ports = list_serial_ports()
|
||
self.port_combo["values"] = ports
|
||
if ports and not self.port_var.get():
|
||
self.port_var.set(ports[0])
|
||
|
||
# Сохранение настроек
|
||
def save_settings(self):
|
||
try:
|
||
self.settings.update({
|
||
"port": self.port_var.get(),
|
||
"baudrate": int(self.baudrate_var.get()),
|
||
"timeout": int(self.timeout_var.get()),
|
||
"copy_mode": self.copy_mode_var.get(),
|
||
"block_size": int(self.block_size_var.get()),
|
||
"prompt": self.prompt_var.get()
|
||
})
|
||
settings_save(self.settings)
|
||
if self.callback:
|
||
self.callback()
|
||
self.destroy()
|
||
messagebox.showinfo("Успех", "Настройки успешно сохранены")
|
||
except ValueError as e:
|
||
messagebox.showerror("Ошибка", "Проверьте правильность введенных значений")
|
||
|
||
# Общая функция для добавления текста в текстовое поле
|
||
def append_text_to_widget(widget, text):
|
||
# Проверяем, заканчивается ли текст символом новой строки
|
||
if not text.endswith('\n'):
|
||
text += '\n'
|
||
widget.insert(END, text)
|
||
widget.see(END)
|
||
|
||
# Общая функция для выбора файла конфигурации
|
||
def select_config_file(self, var, save_to_settings=False):
|
||
filename = filedialog.askopenfilename(
|
||
title="Выберите файл конфигурации",
|
||
filetypes=[("Text files", "*.txt")]
|
||
)
|
||
if filename:
|
||
var.set(filename)
|
||
if save_to_settings:
|
||
self.settings["config_file"] = filename
|
||
settings_save(self.settings)
|
||
|
||
# Общая функция для отправки команды и обработки ответа
|
||
def send_command_and_process_response(
|
||
serial_connection,
|
||
cmd,
|
||
timeout,
|
||
max_attempts=3,
|
||
log_callback=None,
|
||
login=None,
|
||
password=None,
|
||
is_gui=False
|
||
):
|
||
attempt = 0
|
||
while attempt < max_attempts:
|
||
msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
serial_connection.write((cmd + "\n").encode())
|
||
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
|
||
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||
if response:
|
||
if '^' in response:
|
||
msg = (
|
||
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
|
||
f"Ответ устройства:\n{response}\n"
|
||
f"Повторная отправка команды...\n"
|
||
)
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
|
||
attempt += 1
|
||
time.sleep(1)
|
||
continue
|
||
else:
|
||
msg = f"Ответ устройства:\n{response}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.info(f"Ответ устройства:\n{response}")
|
||
return True, response
|
||
else:
|
||
msg = f"Ответ не получен для команды: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning(f"Нет ответа для команды: {cmd}")
|
||
return False, None
|
||
|
||
msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.error(msg)
|
||
return False, None
|
||
|
||
# Основной класс для графического интерфейса
|
||
class SerialAppGUI(tk.Tk):
|
||
def __init__(self, settings):
|
||
super().__init__()
|
||
self.title("ComConfigCopy")
|
||
self.geometry("900x700")
|
||
|
||
# Добавляем VERSION как атрибут класса
|
||
self.VERSION = VERSION
|
||
|
||
# Инициализация проверки обновлений
|
||
self.update_checker = UpdateChecker(
|
||
VERSION,
|
||
"https://gitea.filow.ru/LowaSC/ComConfigCopy",
|
||
include_prereleases=False # Проверяем только стабильные версии
|
||
)
|
||
|
||
# Настройка стиля
|
||
self.style = ttk.Style(self)
|
||
self.style.theme_use("clam")
|
||
default_font = ("Segoe UI", 10)
|
||
self.option_add("*Font", default_font)
|
||
|
||
self.settings = settings
|
||
self.connection = None
|
||
self.tftp_server = None
|
||
|
||
self.create_menu()
|
||
self.create_tabs()
|
||
|
||
# Проверка первого запуска
|
||
self.check_first_run()
|
||
|
||
# Создание меню
|
||
def create_menu(self):
|
||
menubar = tk.Menu(self)
|
||
self.config(menu=menubar)
|
||
|
||
# Меню "Файл"
|
||
file_menu = tk.Menu(menubar, tearoff=0)
|
||
menubar.add_cascade(label="Файл", menu=file_menu)
|
||
file_menu.add_command(label="Настройки", command=self.open_settings)
|
||
file_menu.add_separator()
|
||
file_menu.add_command(label="Выход", command=self.quit)
|
||
|
||
# Меню "Справка"
|
||
help_menu = tk.Menu(menubar, tearoff=0)
|
||
menubar.add_cascade(label="Справка", menu=help_menu)
|
||
help_menu.add_command(label="Проверить обновления", command=self.check_for_updates)
|
||
help_menu.add_separator()
|
||
help_menu.add_command(label="О программе", command=self.open_about)
|
||
|
||
# Проверка наличия обновлений
|
||
def check_for_updates(self):
|
||
def on_update_check(update_available, error):
|
||
if error:
|
||
messagebox.showerror(
|
||
"Ошибка проверки обновлений",
|
||
f"Не удалось проверить наличие обновлений:\n{error}"
|
||
)
|
||
elif update_available:
|
||
release_info = self.update_checker.get_release_notes()
|
||
if release_info:
|
||
# Форматируем сообщение
|
||
message = (
|
||
f"Доступна новая версия {release_info['version']}!\n\n"
|
||
f"Тип релиза: {'Пре-релиз' if release_info['type'] == 'prerelease' else 'Стабильный'}\n\n"
|
||
"Изменения:\n"
|
||
f"{release_info['description']}\n\n"
|
||
"Хотите перейти на страницу загрузки?"
|
||
)
|
||
response = messagebox.askyesno(
|
||
"Доступно обновление",
|
||
message,
|
||
)
|
||
if response:
|
||
webbrowser.open(release_info["link"])
|
||
else:
|
||
messagebox.showerror(
|
||
"Ошибка",
|
||
"Не удалось получить информацию о новой версии"
|
||
)
|
||
else:
|
||
messagebox.showinfo(
|
||
"Проверка обновлений",
|
||
"У вас установлена последняя версия программы"
|
||
)
|
||
|
||
self.update_checker.check_updates(callback=on_update_check)
|
||
|
||
# Обработчик изменения настроек
|
||
def on_settings_changed(self):
|
||
self.settings = settings_load()
|
||
|
||
# Открытие окна настроек
|
||
def open_settings(self):
|
||
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
|
||
settings_window.transient(self)
|
||
settings_window.grab_set()
|
||
|
||
# Проверка первого запуска программы
|
||
def check_first_run(self):
|
||
show_settings = False
|
||
|
||
# Проверяем существование файла настроек
|
||
if not os.path.exists(SETTINGS_FILE):
|
||
show_settings = True
|
||
else:
|
||
# Проверяем содержимое файла настроек
|
||
try:
|
||
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
|
||
settings = json.load(f)
|
||
# Если порт не настроен, считаем это первым запуском
|
||
if settings.get("port") is None:
|
||
show_settings = True
|
||
except Exception:
|
||
# Если файл поврежден или не читается, тоже показываем настройки
|
||
show_settings = True
|
||
|
||
if show_settings:
|
||
# Создаем папку Settings, если её нет
|
||
os.makedirs("Settings", exist_ok=True)
|
||
|
||
response = messagebox.askyesno(
|
||
"Первый запуск",
|
||
"Это первый запуск программы. Хотите настроить параметры подключения сейчас?"
|
||
)
|
||
if response:
|
||
self.open_settings()
|
||
|
||
# Создание вкладок
|
||
def create_tabs(self):
|
||
self.notebook = ttk.Notebook(self)
|
||
self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Создаем вкладки
|
||
interactive_frame = ttk.Frame(self.notebook)
|
||
file_exec_frame = ttk.Frame(self.notebook)
|
||
config_editor_frame = ttk.Frame(self.notebook)
|
||
tftp_frame = ttk.Frame(self.notebook)
|
||
|
||
self.notebook.add(interactive_frame, text="Интерактивный режим")
|
||
self.notebook.add(file_exec_frame, text="Выполнение файла")
|
||
self.notebook.add(config_editor_frame, text="Редактор конфигурации")
|
||
self.notebook.add(tftp_frame, text="TFTP Сервер")
|
||
|
||
self.create_interactive_tab(interactive_frame)
|
||
self.create_file_exec_tab(file_exec_frame)
|
||
self.create_config_editor_tab(config_editor_frame)
|
||
self.create_tftp_tab(tftp_frame)
|
||
|
||
# Создание вкладки "Интерактивный режим"
|
||
def create_interactive_tab(self, frame):
|
||
control_frame = ttk.Frame(frame)
|
||
control_frame.pack(fill=X, pady=5)
|
||
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
|
||
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
|
||
|
||
self.interactive_text = CustomText(frame, wrap="word", height=20)
|
||
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
input_frame = ttk.Frame(frame)
|
||
input_frame.pack(fill=X, pady=5)
|
||
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
|
||
self.command_entry = CustomEntry(input_frame, width=50)
|
||
self.command_entry.pack(side=LEFT, padx=5)
|
||
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
|
||
|
||
# Подключение к устройству
|
||
def connect_device(self):
|
||
if self.connection:
|
||
messagebox.showinfo("Информация", "Уже подключено.")
|
||
return
|
||
if not self.settings.get("port"):
|
||
messagebox.showerror("Ошибка", "COM-порт не выбран!")
|
||
return
|
||
self.connection = create_connection(self.settings)
|
||
if self.connection:
|
||
self.append_interactive_text("[INFO] Подключение установлено.\n")
|
||
else:
|
||
self.append_interactive_text("[ERROR] Не удалось установить соединение.\n")
|
||
|
||
# Отключение от устройства
|
||
def disconnect_device(self):
|
||
if self.connection:
|
||
try:
|
||
self.connection.close()
|
||
except Exception:
|
||
pass
|
||
self.connection = None
|
||
self.append_interactive_text("[INFO] Соединение закрыто.\n")
|
||
else:
|
||
messagebox.showinfo("Информация", "Соединение не установлено.")
|
||
|
||
# Отправка команды
|
||
def send_command(self):
|
||
if not self.connection:
|
||
messagebox.showerror("Ошибка", "Сначала установите соединение!")
|
||
return
|
||
cmd = self.command_entry.get().strip()
|
||
if not cmd:
|
||
return
|
||
self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n")
|
||
threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start()
|
||
|
||
# Обработка команды
|
||
def process_command(self, cmd):
|
||
try:
|
||
success, response = send_command_and_process_response(
|
||
self.connection,
|
||
cmd,
|
||
self.settings.get("timeout", 10),
|
||
max_attempts=3,
|
||
log_callback=self.append_interactive_text,
|
||
login=self.settings.get("login"),
|
||
password=self.settings.get("password"),
|
||
is_gui=True
|
||
)
|
||
except SerialException as e:
|
||
self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n")
|
||
logging.error(f"Ошибка отправки команды: {e}", exc_info=True)
|
||
except Exception as e:
|
||
self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n")
|
||
logging.error(f"Неизвестная ошибка: {e}", exc_info=True)
|
||
|
||
# Добавление текста в текстовое поле
|
||
def append_interactive_text(self, text):
|
||
append_text_to_widget(self.interactive_text, text)
|
||
|
||
# Создание вкладки "Выполнить команды из файла"
|
||
def create_file_exec_tab(self, frame):
|
||
file_frame = ttk.Frame(frame)
|
||
file_frame.pack(fill=X, pady=5)
|
||
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
|
||
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
|
||
CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
|
||
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
|
||
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
|
||
self.file_exec_text = CustomText(frame, wrap="word", height=15)
|
||
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Выбор файла конфигурации для выполнения команд
|
||
def select_config_file_fileexec(self):
|
||
select_config_file(self, self.file_exec_var)
|
||
|
||
# Выполнение команд из файла
|
||
def execute_file_commands(self):
|
||
if not self.settings.get("port"):
|
||
messagebox.showerror("Ошибка", "COM-порт не выбран!")
|
||
return
|
||
if not self.file_exec_var.get():
|
||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран!")
|
||
return
|
||
if not self.connection:
|
||
self.connection = create_connection(self.settings)
|
||
if not self.connection:
|
||
self.append_file_exec_text("[ERROR] Не удалось установить соединение.\n")
|
||
return
|
||
threading.Thread(
|
||
target=execute_commands_from_file,
|
||
args=(
|
||
self.connection,
|
||
self.file_exec_var.get(),
|
||
self.settings.get("timeout", 10),
|
||
self.settings.get("copy_mode", "line"),
|
||
self.settings.get("block_size", 15),
|
||
self.append_file_exec_text,
|
||
self.settings.get("login"),
|
||
self.settings.get("password"),
|
||
True,
|
||
),
|
||
daemon=True,
|
||
).start()
|
||
|
||
def append_file_exec_text(self, text):
|
||
append_text_to_widget(self.file_exec_text, text)
|
||
|
||
# Создание вкладки "Редактор конфигурационного файла"
|
||
def create_config_editor_tab(self, frame):
|
||
top_frame = ttk.Frame(frame)
|
||
top_frame.pack(fill=X, pady=5)
|
||
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
|
||
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
|
||
CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
|
||
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
|
||
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
|
||
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
|
||
self.config_editor_text = CustomText(frame, wrap="word")
|
||
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Выбор файла конфигурации для редактирования
|
||
def select_config_file_editor(self):
|
||
select_config_file(self, self.editor_file_var, save_to_settings=True)
|
||
|
||
# Загрузка файла конфигурации
|
||
def load_config_file(self):
|
||
filename = self.editor_file_var.get()
|
||
if not filename or not os.path.exists(filename):
|
||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран или не существует.")
|
||
return
|
||
try:
|
||
with open(filename, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
self.config_editor_text.delete("1.0", END)
|
||
self.config_editor_text.insert(END, content)
|
||
messagebox.showinfo("Информация", "Файл загружен.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка загрузки файла: {e}", exc_info=True)
|
||
messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}")
|
||
|
||
# Сохранение файла конфигурации
|
||
def save_config_file(self):
|
||
filename = self.editor_file_var.get()
|
||
if not filename:
|
||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран.")
|
||
return
|
||
try:
|
||
content = self.config_editor_text.get("1.0", END)
|
||
with open(filename, "w", encoding="utf-8") as f:
|
||
f.write(content)
|
||
messagebox.showinfo("Информация", "Файл сохранён.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка сохранения файла: {e}", exc_info=True)
|
||
messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}")
|
||
|
||
# Открытие окна "О программе"
|
||
def open_about(self):
|
||
about_window = AboutWindow(self)
|
||
about_window.transient(self)
|
||
about_window.grab_set()
|
||
|
||
# Создание вкладки TFTP сервера
|
||
def create_tftp_tab(self, frame):
|
||
# Создаем фрейм для управления TFTP сервером
|
||
tftp_frame = ttk.Frame(frame)
|
||
tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Создаем и размещаем элементы управления
|
||
controls_frame = ttk.LabelFrame(tftp_frame, text="Управление TFTP сервером")
|
||
controls_frame.pack(fill=X, padx=5, pady=5)
|
||
|
||
# IP адрес
|
||
ip_frame = ttk.Frame(controls_frame)
|
||
ip_frame.pack(fill=X, padx=5, pady=2)
|
||
ttk.Label(ip_frame, text="IP адрес:").pack(side=LEFT, padx=5)
|
||
self.tftp_ip_var = StringVar(value="0.0.0.0")
|
||
self.tftp_ip_combo = ttk.Combobox(ip_frame, textvariable=self.tftp_ip_var, state="readonly")
|
||
self.tftp_ip_combo.pack(side=LEFT, fill=X, expand=True, padx=5)
|
||
ttk.Button(ip_frame, text="Обновить", command=self.update_network_adapters).pack(side=LEFT, padx=5)
|
||
|
||
# Заполняем список адаптеров
|
||
self.update_network_adapters()
|
||
|
||
# Порт
|
||
port_frame = ttk.Frame(controls_frame)
|
||
port_frame.pack(fill=X, padx=5, pady=2)
|
||
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
|
||
self.tftp_port_var = StringVar(value="69")
|
||
self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var)
|
||
self.tftp_port_entry.pack(fill=X, expand=True, padx=5)
|
||
|
||
# Кнопки управления
|
||
buttons_frame = ttk.Frame(controls_frame)
|
||
buttons_frame.pack(fill=X, padx=5, pady=5)
|
||
|
||
self.start_tftp_button = ttk.Button(
|
||
buttons_frame,
|
||
text="Запустить сервер",
|
||
command=self.start_tftp_server
|
||
)
|
||
self.start_tftp_button.pack(side=LEFT, padx=5)
|
||
|
||
self.stop_tftp_button = ttk.Button(
|
||
buttons_frame,
|
||
text="Остановить сервер",
|
||
command=self.stop_tftp_server,
|
||
state="disabled"
|
||
)
|
||
self.stop_tftp_button.pack(side=LEFT, padx=5)
|
||
|
||
# Лог сервера
|
||
log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера")
|
||
log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
self.tftp_log_text = CustomText(log_frame, wrap=tk.WORD, height=10)
|
||
self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Добавляем скроллбар для лога
|
||
scrollbar = ttk.Scrollbar(self.tftp_log_text, command=self.tftp_log_text.yview)
|
||
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
|
||
self.tftp_log_text.config(yscrollcommand=scrollbar.set)
|
||
|
||
# Статус передач
|
||
transfers_frame = ttk.LabelFrame(tftp_frame, text="Активные передачи")
|
||
transfers_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Создаем таблицу для отображения активных передач
|
||
columns = ("client", "filename", "progress", "remaining", "time")
|
||
self.transfers_tree = ttk.Treeview(transfers_frame, columns=columns, show="headings")
|
||
|
||
# Настраиваем заголовки колонок
|
||
self.transfers_tree.heading("client", text="Клиент")
|
||
self.transfers_tree.heading("filename", text="Файл")
|
||
self.transfers_tree.heading("progress", text="Прогресс")
|
||
self.transfers_tree.heading("remaining", text="Осталось")
|
||
self.transfers_tree.heading("time", text="Время")
|
||
|
||
# Настраиваем ширину колонок
|
||
self.transfers_tree.column("client", width=120)
|
||
self.transfers_tree.column("filename", width=150)
|
||
self.transfers_tree.column("progress", width=100)
|
||
self.transfers_tree.column("remaining", width=100)
|
||
self.transfers_tree.column("time", width=80)
|
||
|
||
self.transfers_tree.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Инициализация TFTP сервера
|
||
self.tftp_server = None
|
||
self.tftp_server_thread = None
|
||
|
||
# Запуск TFTP сервера
|
||
def start_tftp_server(self):
|
||
try:
|
||
# Получаем выбранный IP-адрес
|
||
ip = self.tftp_ip_var.get()
|
||
if not ip:
|
||
messagebox.showerror("Ошибка", "Выберите IP-адрес для TFTP сервера")
|
||
return
|
||
|
||
# Проверяем корректность порта
|
||
try:
|
||
port = int(self.tftp_port_var.get())
|
||
if port <= 0 or port > 65535:
|
||
raise ValueError("Порт должен быть в диапазоне 1-65535")
|
||
except ValueError as e:
|
||
messagebox.showerror("Ошибка", f"Некорректный порт: {str(e)}")
|
||
return
|
||
|
||
# Создаем экземпляр TFTP сервера
|
||
self.tftp_server = TFTPServer("Firmware")
|
||
|
||
# Устанавливаем callback для логирования
|
||
def log_callback(message):
|
||
# Фильтруем дублирующиеся сообщения о запуске/остановке сервера
|
||
if "[INFO] TFTP сервер запущен" in message and hasattr(self, '_server_started'):
|
||
return
|
||
if "[INFO] TFTP сервер остановлен" in message and hasattr(self, '_server_stopped'):
|
||
return
|
||
|
||
self.append_tftp_log(message)
|
||
|
||
# Устанавливаем флаги для отслеживания состояния
|
||
if "[INFO] TFTP сервер запущен" in message:
|
||
self._server_started = True
|
||
elif "[INFO] TFTP сервер остановлен" in message:
|
||
self._server_stopped = True
|
||
|
||
# Обновляем информацию о передачах
|
||
self.update_transfers_info()
|
||
|
||
self.tftp_server.set_log_callback(log_callback)
|
||
|
||
# Запускаем сервер в отдельном потоке
|
||
self.tftp_server_thread = threading.Thread(
|
||
target=self.run_tftp_server,
|
||
args=(ip, port),
|
||
daemon=True
|
||
)
|
||
self.tftp_server_thread.start()
|
||
|
||
# Обновляем состояние кнопок и элементов управления
|
||
self.start_tftp_button.config(state="disabled")
|
||
self.stop_tftp_button.config(state="normal")
|
||
self.tftp_ip_combo.config(state="disabled")
|
||
self.tftp_port_entry.config(state="disabled")
|
||
|
||
# Запускаем периодическое обновление информации о передачах
|
||
self.update_transfers_periodically()
|
||
|
||
except Exception as e:
|
||
self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
|
||
messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}")
|
||
|
||
# Запуск TFTP сервера в отдельном потоке
|
||
def run_tftp_server(self, ip, port):
|
||
try:
|
||
self.tftp_server.start_server(ip, port)
|
||
except Exception as e:
|
||
self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}")
|
||
|
||
# Остановка TFTP сервера
|
||
def stop_tftp_server(self):
|
||
if self.tftp_server:
|
||
try:
|
||
# Отключаем кнопки на время остановки сервера
|
||
self.start_tftp_button.config(state="disabled")
|
||
self.stop_tftp_button.config(state="disabled")
|
||
|
||
# Сбрасываем флаги состояния
|
||
if hasattr(self, '_server_started'):
|
||
delattr(self, '_server_started')
|
||
if hasattr(self, '_server_stopped'):
|
||
delattr(self, '_server_stopped')
|
||
|
||
# Останавливаем сервер
|
||
self.tftp_server.stop_server()
|
||
|
||
# Ждем завершения потока сервера с таймаутом
|
||
if self.tftp_server_thread:
|
||
self.tftp_server_thread.join(timeout=5.0)
|
||
if self.tftp_server_thread.is_alive():
|
||
self.append_tftp_log("[WARN] Превышено время ожидания остановки сервера")
|
||
|
||
# Очищаем ссылки на сервер и поток
|
||
self.tftp_server = None
|
||
self.tftp_server_thread = None
|
||
|
||
# Обновляем состояние кнопок
|
||
self.start_tftp_button.config(state="normal")
|
||
self.stop_tftp_button.config(state="disabled")
|
||
self.tftp_ip_combo.config(state="normal")
|
||
self.tftp_port_entry.config(state="normal")
|
||
|
||
# Очищаем таблицу передач
|
||
for item in self.transfers_tree.get_children():
|
||
self.transfers_tree.delete(item)
|
||
|
||
except Exception as e:
|
||
self.append_tftp_log(f"[ERROR] Ошибка остановки сервера: {str(e)}")
|
||
messagebox.showerror("Ошибка", f"Не удалось остановить TFTP сервер: {str(e)}")
|
||
|
||
# Восстанавливаем состояние кнопок в случае ошибки
|
||
self.start_tftp_button.config(state="disabled")
|
||
self.stop_tftp_button.config(state="normal")
|
||
|
||
def append_tftp_log(self, text):
|
||
append_text_to_widget(self.tftp_log_text, text)
|
||
|
||
# Обновление информации об активных передачах
|
||
def update_transfers_info(self):
|
||
if not self.tftp_server:
|
||
return
|
||
|
||
# Очищаем текущие записи
|
||
for item in self.transfers_tree.get_children():
|
||
self.transfers_tree.delete(item)
|
||
|
||
# Добавляем информацию о текущих передачах
|
||
for client_addr, transfer_info in self.tftp_server.active_transfers.items():
|
||
filename = transfer_info['filename']
|
||
bytes_sent = transfer_info['bytes_sent']
|
||
filesize = transfer_info['filesize']
|
||
start_time = transfer_info['start_time']
|
||
|
||
# Вычисляем прогресс
|
||
progress = f"{bytes_sent}/{filesize} байт"
|
||
remaining_bytes = filesize - bytes_sent
|
||
elapsed_time = time.time() - start_time
|
||
|
||
# Вычисляем скорость передачи (байт/сек)
|
||
if elapsed_time > 0:
|
||
transfer_speed = bytes_sent / elapsed_time
|
||
# Вычисляем оставшееся время
|
||
if transfer_speed > 0:
|
||
remaining_time = remaining_bytes / transfer_speed
|
||
remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)"
|
||
else:
|
||
remaining_str = f"{remaining_bytes} байт (неизвестно)"
|
||
else:
|
||
remaining_str = f"{remaining_bytes} байт (вычисляется...)"
|
||
|
||
# Добавляем запись в таблицу
|
||
self.transfers_tree.insert("", END, values=(
|
||
f"{client_addr[0]}:{client_addr[1]}",
|
||
filename,
|
||
progress,
|
||
remaining_str,
|
||
f"{elapsed_time:.1f}с"
|
||
))
|
||
|
||
# Периодическое обновление информации о передачах
|
||
def update_transfers_periodically(self):
|
||
if self.tftp_server and self.tftp_server.running:
|
||
self.update_transfers_info()
|
||
# Планируем следующее обновление через 1 секунду
|
||
self.after(1000, self.update_transfers_periodically)
|
||
|
||
# Обновление списка сетевых адаптеров
|
||
def update_network_adapters(self):
|
||
adapters = get_network_adapters()
|
||
self.tftp_ip_combo["values"] = adapters
|
||
if not self.tftp_ip_var.get() in adapters:
|
||
self.tftp_ip_var.set(adapters[0])
|
||
|
||
# ==========================
|
||
# Основной запуск приложения
|
||
# ==========================
|
||
def main():
|
||
setup_logging()
|
||
settings = settings_load()
|
||
app = SerialAppGUI(settings)
|
||
app.mainloop()
|
||
|
||
# ==========================
|
||
# Основной запуск приложения
|
||
# ==========================
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except KeyboardInterrupt:
|
||
logging.info("Программа прервана пользователем (KeyboardInterrupt).")
|
||
sys.exit(0)
|
||
except Exception as e:
|
||
logging.critical(f"Неизвестная ошибка: {e}", exc_info=True)
|
||
sys.exit(1)
|