ComConfigCopy/ComConfigCopy.py
Lowa 1a511ff54f Enhance update checking
- Refactor UpdateChecker
- Add support for parsing release details with improved formatting
- Implement more robust version comparison and release type handling
- Add logging for update checking process
- Improve error handling and release information extraction
- Update update checking logic to handle stable and pre-release versions
2025-02-17 00:53:24 +03:00

1288 lines
60 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ------------------------------------------------------------
# Это программа для копирования конфигураций на коммутаторы
# ------------------------------------------------------------
import json
import logging
import os
import re
import sys
import threading
import time
import webbrowser
from getpass import getpass
from logging.handlers import RotatingFileHandler
import tkinter as tk
from tkinter import (
StringVar,
END,
BOTH,
LEFT,
X,
W,
filedialog,
messagebox,
simpledialog,
)
from tkinter import ttk
import serial
import serial.tools.list_ports
from serial.serialutil import SerialException
from about_window import AboutWindow
from TFTPServer import TFTPServer
import socket
from update_checker import UpdateChecker
# Версия программы
VERSION = "1.0.1"
# Создаем необходимые папки
os.makedirs("Logs", exist_ok=True)
os.makedirs("Configs", exist_ok=True)
os.makedirs("Settings", exist_ok=True)
os.makedirs("Firmware", exist_ok=True)
os.makedirs("docs", exist_ok=True)
# Файл настроек находится в папке Settings
SETTINGS_FILE = os.path.join("Settings", "settings.json")
# Настройка логирования с использованием RotatingFileHandler.
def setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
log_path = os.path.join("Logs", "app.log")
handler = RotatingFileHandler(
log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8"
)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# Загрузка настроек из JSON-файла или создание настроек по умолчанию.
def settings_load():
default_settings = {
"port": None, # Порт для подключения
"baudrate": 9600, # Скорость передачи данных
"config_file": None, # Файл конфигурации
"login": None, # Логин для подключения
"password": None, # Пароль для подключения
"timeout": 10, # Таймаут подключения
"copy_mode": "line", # Режим копирования
"block_size": 15, # Размер блока команд
"prompt": ">", # Используется для определения приглашения
}
# Создаем папку Settings, если её нет
os.makedirs("Settings", exist_ok=True)
if not os.path.exists(SETTINGS_FILE):
try:
# При первом запуске создаем новый файл настроек
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(default_settings, f, indent=4, ensure_ascii=False)
logging.info("Файл настроек создан с настройками по умолчанию.")
return default_settings
except Exception as e:
logging.error(f"Ошибка при создании файла настроек: {e}", exc_info=True)
return default_settings
try:
# Пытаемся загрузить существующие настройки
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
# Проверяем наличие всех необходимых параметров
settings_changed = False
for key, value in default_settings.items():
if key not in settings:
settings[key] = value
settings_changed = True
# Если были добавлены новые параметры, сохраняем обновленные настройки
if settings_changed:
try:
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(settings, f, indent=4, ensure_ascii=False)
logging.info("Файл настроек обновлен с новыми параметрами.")
except Exception as e:
logging.error(f"Ошибка при обновлении файла настроек: {e}", exc_info=True)
return settings
except Exception as e:
logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True)
return default_settings
# Сохранение настроек в JSON-файл
def settings_save(settings):
try:
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(settings, f, indent=4, ensure_ascii=False)
logging.info("Настройки сохранены в файл.")
except Exception as e:
logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True)
# Получение списка доступных последовательных портов
def list_serial_ports():
"""Получение списка доступных последовательных портов."""
ports = serial.tools.list_ports.comports()
logging.debug(f"Найдено {len(ports)} серийных портов.")
return [port.device for port in ports]
# Получение списка IP-адресов из сетевых адаптеров
def get_network_adapters():
adapters = []
try:
# Получаем имя хоста
hostname = socket.gethostname()
# Получаем все адреса для данного хоста
addresses = socket.getaddrinfo(hostname, None)
# Создаем множество для хранения уникальных IP-адресов
unique_ips = set()
for addr in addresses:
ip = addr[4][0]
# Пропускаем IPv6 и локальные адреса
if ':' not in ip and not ip.startswith('127.'):
unique_ips.add(ip)
# Добавляем все найденные IP-адреса в список
for ip in sorted(unique_ips):
adapters.append(f"{ip}")
# Добавляем 0.0.0.0 для прослушивания всех интерфейсов
adapters.insert(0, "0.0.0.0")
except Exception as e:
logging.error(f"Ошибка при получении списка сетевых адаптеров: {e}", exc_info=True)
# В случае ошибки возвращаем хотя бы 0.0.0.0
adapters = ["0.0.0.0"]
return adapters
# Создание соединения с устройством через последовательный порт
def create_connection(settings):
try:
conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1)
logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.")
time.sleep(1)
return conn
except SerialException as e:
logging.error(f"Ошибка подключения: {e}", exc_info=True)
except Exception as e:
logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True)
return None
# Проверка наличия логина и пароля в настройках и отправка их на устройство
def send_login_password(serial_connection, login=None, password=None, is_gui=False):
if not login:
if is_gui:
login = simpledialog.askstring("Login", "Введите логин:")
if login is None:
login = ""
else:
login = input("Введите логин: ")
if not password:
if is_gui:
password = simpledialog.askstring("Password", "Введите пароль:", show="*")
if password is None:
password = ""
else:
password = getpass("Введите пароль: ")
# Чтение ответа от устройства с учётом таймаута.
def read_response(serial_connection, timeout, login=None, password=None, is_gui=False):
response = b""
end_time = time.time() + timeout
decoded = ""
while time.time() < end_time:
if serial_connection.in_waiting:
chunk = serial_connection.read(serial_connection.in_waiting)
response += chunk
if b"--More--" in response:
serial_connection.write(b"\n")
response = response.replace(b"--More--", b"")
try:
decoded = response.decode(errors="ignore")
except Exception:
decoded = ""
lines = decoded.rstrip().splitlines()
if lines:
last_line = lines[-1].strip()
if re.search(r'(login:|username:)$', last_line, re.IGNORECASE):
send_login_password(serial_connection, login, None, is_gui)
response = b""
continue
if re.search(r'(password:)$', last_line, re.IGNORECASE):
send_login_password(serial_connection, None, password, is_gui)
response = b""
continue
if last_line.endswith(">") or last_line.endswith("#"):
break
else:
time.sleep(0.1)
return decoded
# Генерация блоков команд для блочного копирования
def generate_command_blocks(lines, block_size):
blocks = []
current_block = []
for line in lines:
trimmed = line.strip()
if not trimmed:
continue
lower_line = trimmed.lower()
if lower_line.startswith("vlan") or lower_line.startswith("enable") or lower_line.startswith("interface"):
if current_block:
blocks.append("\n".join(current_block))
current_block = []
blocks.append(trimmed)
elif lower_line.startswith("exit"):
current_block.append(trimmed)
blocks.append("\n".join(current_block))
current_block = []
else:
current_block.append(trimmed)
if len(current_block) >= block_size:
blocks.append("\n".join(current_block))
current_block = []
if current_block:
blocks.append("\n".join(current_block))
return blocks
# Выполнение команд из файла конфигурации
def execute_commands_from_file(
serial_connection,
filename,
timeout,
copy_mode,
block_size,
log_callback=None,
login=None,
password=None,
is_gui=False,
):
try:
with open(filename, "r", encoding="utf-8") as file:
lines = [line for line in file if line.strip()]
msg = f"Выполнение команд из файла: {filename}\n"
logging.info(msg)
if log_callback:
log_callback(msg)
# Если выбран построчный режим
if copy_mode == "line":
for cmd in lines:
cmd = cmd.strip()
max_attempts = 3
attempt = 0
while attempt < max_attempts:
msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
if log_callback:
log_callback(msg)
serial_connection.write((cmd + "\n").encode())
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
if response:
if '^' in response:
msg = (
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{response}\n"
f"Повторная отправка команды...\n"
)
if log_callback:
log_callback(msg)
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
attempt += 1
time.sleep(1)
continue
else:
msg = f"Ответ устройства:\n{response}\n"
if log_callback:
log_callback(msg)
logging.info(f"Ответ устройства:\n{response}")
break
else:
msg = f"Ответ не получен для команды: {cmd}\n"
if log_callback:
log_callback(msg)
logging.warning(f"Нет ответа для команды: {cmd}")
break
if attempt == max_attempts:
msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
if log_callback:
log_callback(msg)
logging.error(msg)
time.sleep(1)
# Если выбран блочный режим
elif copy_mode == "block":
blocks = generate_command_blocks(lines, block_size)
for block in blocks:
msg = f"\nОтправка блока команд:\n{block}\n"
if log_callback:
log_callback(msg)
serial_connection.write((block + "\n").encode())
logging.info(f"Отправлен блок команд:\n{block}")
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
# Если обнаружена ошибка в ответе на блок, отправляем команды по очереди
if response and '^' in response:
msg = (
f"[WARNING] Обнаружена ошибка при выполнении блока команд.\n"
f"Ответ устройства:\n{response}\n"
f"Пересылаются команды по отдельности...\n"
)
if log_callback:
log_callback(msg)
logging.warning("Ошибка в блочном режиме отправляем команды индивидуально.")
for line in block.splitlines():
cmd = line.strip()
if not cmd:
continue
max_attempts = 3
attempt = 0
while attempt < max_attempts:
sub_msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
if log_callback:
log_callback(sub_msg)
serial_connection.write((cmd + "\n").encode())
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
sub_response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
if sub_response:
if '^' in sub_response:
sub_msg = (
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{sub_response}\n"
f"Повторная отправка команды...\n"
)
if log_callback:
log_callback(sub_msg)
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
attempt += 1
time.sleep(1)
continue
else:
sub_msg = f"Ответ устройства:\n{sub_response}\n"
if log_callback:
log_callback(sub_msg)
logging.info(f"Ответ устройства:\n{sub_response}")
break
else:
sub_msg = f"Ответ не получен для команды: {cmd}\n"
if log_callback:
log_callback(sub_msg)
logging.warning(f"Нет ответа для команды: {cmd}")
break
if attempt == max_attempts:
sub_msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
if log_callback:
log_callback(sub_msg)
logging.error(sub_msg)
time.sleep(1)
else:
if response:
msg = f"Ответ устройства:\n{response}\n"
if log_callback:
log_callback(msg)
logging.info(f"Ответ устройства:\n{response}")
else:
msg = f"Ответ не получен для блока:\n{block}\n"
if log_callback:
log_callback(msg)
logging.warning(f"Нет ответа для блока:\n{block}")
time.sleep(1)
except SerialException as e:
msg = f"Ошибка при выполнении команды: {e}\n"
if log_callback:
log_callback(msg)
logging.error(f"Ошибка при выполнении команды: {e}", exc_info=True)
except Exception as e:
msg = f"Ошибка при выполнении команд: {e}\n"
if log_callback:
log_callback(msg)
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
# Базовый класс для кастомных виджетов с поддержкой контекстного меню и горячих клавиш
class CustomWidgetBase:
def __init__(self):
self.create_context_menu()
self.bind_shortcuts()
# Создание контекстного меню
def create_context_menu(self):
"""Создание контекстного меню"""
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
# Привязка горячих клавиш
def bind_shortcuts(self):
"""Привязка горячих клавиш"""
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Дополнительные сочетания
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
# Отображение контекстного меню
def show_context_menu(self, event):
"""Отображение контекстного меню"""
self.context_menu.post(event.x_root, event.y_root)
# Вырезание текста
def cut(self):
self.event_generate("<<Cut>>")
# Копирование текста
def copy(self):
self.event_generate("<<Copy>>")
# Вставка текста
def paste(self):
self.event_generate("<<Paste>>")
# Класс для текстового поля с поддержкой контекстного меню и горячих клавиш
class CustomText(tk.Text, CustomWidgetBase):
def __init__(self, *args, **kwargs):
tk.Text.__init__(self, *args, **kwargs)
CustomWidgetBase.__init__(self)
def select_all(self, event=None):
"""Выделение всего текста в Text виджете"""
self.tag_add(tk.SEL, "1.0", tk.END)
self.mark_set(tk.INSERT, "1.0")
self.see(tk.INSERT)
return "break"
# Класс для поля ввода с поддержкой контекстного меню и горячих клавиш
class CustomEntry(ttk.Entry, CustomWidgetBase):
def __init__(self, *args, **kwargs):
ttk.Entry.__init__(self, *args, **kwargs)
CustomWidgetBase.__init__(self)
def select_all(self, event=None):
"""Выделение всего текста в Entry виджете"""
self.select_range(0, tk.END)
return "break"
# Класс для окна настроек
class SettingsWindow(tk.Toplevel):
def __init__(self, parent, settings, callback=None):
super().__init__(parent)
self.title("Настройки")
self.geometry("600x400")
self.settings = settings
self.callback = callback
self.resizable(False, False)
# Создаем фрейм для настроек
settings_frame = ttk.Frame(self, padding="10")
settings_frame.pack(fill=BOTH, expand=True)
# COM порт
ttk.Label(settings_frame, text="COM порт:").grid(row=0, column=0, sticky=W, pady=5)
self.port_var = StringVar(value=settings.get("port", ""))
self.port_combo = ttk.Combobox(settings_frame, textvariable=self.port_var)
self.port_combo.grid(row=0, column=1, sticky=W, pady=5)
ttk.Button(settings_frame, text="Обновить", command=self.update_ports).grid(row=0, column=2, padx=5)
# Скорость передачи
ttk.Label(settings_frame, text="Скорость:").grid(row=1, column=0, sticky=W, pady=5)
self.baudrate_var = StringVar(value=str(settings.get("baudrate", 9600)))
baudrate_combo = ttk.Combobox(settings_frame, textvariable=self.baudrate_var,
values=["9600", "19200", "38400", "57600", "115200"])
baudrate_combo.grid(row=1, column=1, sticky=W, pady=5)
# Таймаут
ttk.Label(settings_frame, text="Таймаут (сек):").grid(row=2, column=0, sticky=W, pady=5)
self.timeout_var = StringVar(value=str(settings.get("timeout", 10)))
timeout_entry = ttk.Entry(settings_frame, textvariable=self.timeout_var)
timeout_entry.grid(row=2, column=1, sticky=W, pady=5)
# Режим копирования
ttk.Label(settings_frame, text="Режим копирования:").grid(row=3, column=0, sticky=W, pady=5)
self.copy_mode_var = StringVar(value=settings.get("copy_mode", "line"))
copy_mode_frame = ttk.Frame(settings_frame)
copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5)
ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line",
variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block",
variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
# Размер блока
self.block_size_label = ttk.Label(settings_frame, text="Размер блока:")
self.block_size_label.grid(row=4, column=0, sticky=W, pady=5)
self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
self.block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var)
self.block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
# Приглашение командной строки
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
self.prompt_var = StringVar(value=settings.get("prompt", ">"))
prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var)
prompt_entry.grid(row=5, column=1, sticky=W, pady=5)
# Кнопки
button_frame = ttk.Frame(settings_frame)
button_frame.grid(row=6, column=0, columnspan=3, pady=20)
ttk.Button(button_frame, text="Сохранить", command=self.save_settings).pack(side=LEFT, padx=5)
ttk.Button(button_frame, text="Отмена", command=self.destroy).pack(side=LEFT, padx=5)
self.update_ports()
# Инициализация видимости поля размера блока
self.toggle_block_size()
# Центрируем окно
self.center_window()
# Переключение видимости поля размера блока
def toggle_block_size(self):
if self.copy_mode_var.get() == "line":
self.block_size_label.grid_remove()
self.block_size_entry.grid_remove()
else:
self.block_size_label.grid()
self.block_size_entry.grid()
# Центрирование окна
def center_window(self):
self.update_idletasks()
width = self.winfo_width()
height = self.winfo_height()
x = (self.winfo_screenwidth() // 2) - (width // 2)
y = (self.winfo_screenheight() // 2) - (height // 2)
self.geometry(f"{width}x{height}+{x}+{y}")
# Обновление списка доступных последовательных портов
def update_ports(self):
ports = list_serial_ports()
self.port_combo["values"] = ports
if ports and not self.port_var.get():
self.port_var.set(ports[0])
# Сохранение настроек
def save_settings(self):
try:
self.settings.update({
"port": self.port_var.get(),
"baudrate": int(self.baudrate_var.get()),
"timeout": int(self.timeout_var.get()),
"copy_mode": self.copy_mode_var.get(),
"block_size": int(self.block_size_var.get()),
"prompt": self.prompt_var.get()
})
settings_save(self.settings)
if self.callback:
self.callback()
self.destroy()
messagebox.showinfo("Успех", "Настройки успешно сохранены")
except ValueError as e:
messagebox.showerror("Ошибка", "Проверьте правильность введенных значений")
# Общая функция для добавления текста в текстовое поле
def append_text_to_widget(widget, text):
# Проверяем, заканчивается ли текст символом новой строки
if not text.endswith('\n'):
text += '\n'
widget.insert(END, text)
widget.see(END)
# Общая функция для выбора файла конфигурации
def select_config_file(self, var, save_to_settings=False):
filename = filedialog.askopenfilename(
title="Выберите файл конфигурации",
filetypes=[("Text files", "*.txt")]
)
if filename:
var.set(filename)
if save_to_settings:
self.settings["config_file"] = filename
settings_save(self.settings)
# Общая функция для отправки команды и обработки ответа
def send_command_and_process_response(
serial_connection,
cmd,
timeout,
max_attempts=3,
log_callback=None,
login=None,
password=None,
is_gui=False
):
attempt = 0
while attempt < max_attempts:
msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
if log_callback:
log_callback(msg)
serial_connection.write((cmd + "\n").encode())
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
if response:
if '^' in response:
msg = (
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{response}\n"
f"Повторная отправка команды...\n"
)
if log_callback:
log_callback(msg)
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
attempt += 1
time.sleep(1)
continue
else:
msg = f"Ответ устройства:\n{response}\n"
if log_callback:
log_callback(msg)
logging.info(f"Ответ устройства:\n{response}")
return True, response
else:
msg = f"Ответ не получен для команды: {cmd}\n"
if log_callback:
log_callback(msg)
logging.warning(f"Нет ответа для команды: {cmd}")
return False, None
msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
if log_callback:
log_callback(msg)
logging.error(msg)
return False, None
# Основной класс для графического интерфейса
class SerialAppGUI(tk.Tk):
def __init__(self, settings):
super().__init__()
self.title("ComConfigCopy")
self.geometry("900x700")
# Добавляем VERSION как атрибут класса
self.VERSION = VERSION
# Инициализация проверки обновлений
self.update_checker = UpdateChecker(
VERSION,
"https://gitea.filow.ru/LowaSC/ComConfigCopy",
include_prereleases=False # Проверяем только стабильные версии
)
# Настройка стиля
self.style = ttk.Style(self)
self.style.theme_use("clam")
default_font = ("Segoe UI", 10)
self.option_add("*Font", default_font)
self.settings = settings
self.connection = None
self.tftp_server = None
self.create_menu()
self.create_tabs()
# Проверка первого запуска
self.check_first_run()
# Создание меню
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="Проверить обновления", command=self.check_for_updates)
help_menu.add_separator()
help_menu.add_command(label="О программе", command=self.open_about)
# Проверка наличия обновлений
def check_for_updates(self):
def on_update_check(update_available, error):
if error:
messagebox.showerror(
"Ошибка проверки обновлений",
f"Не удалось проверить наличие обновлений:\n{error}"
)
elif update_available:
release_info = self.update_checker.get_release_notes()
if release_info:
# Форматируем сообщение
message = (
f"Доступна новая версия {release_info['version']}!\n\n"
f"Тип релиза: {'Пре-релиз' if release_info['type'] == 'prerelease' else 'Стабильный'}\n\n"
"Изменения:\n"
f"{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?"
)
response = messagebox.askyesno(
"Доступно обновление",
message,
)
if response:
webbrowser.open(release_info["link"])
else:
messagebox.showerror(
"Ошибка",
"Не удалось получить информацию о новой версии"
)
else:
messagebox.showinfo(
"Проверка обновлений",
"У вас установлена последняя версия программы"
)
self.update_checker.check_updates(callback=on_update_check)
# Обработчик изменения настроек
def on_settings_changed(self):
self.settings = settings_load()
# Открытие окна настроек
def open_settings(self):
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
# Проверка первого запуска программы
def check_first_run(self):
show_settings = False
# Проверяем существование файла настроек
if not os.path.exists(SETTINGS_FILE):
show_settings = True
else:
# Проверяем содержимое файла настроек
try:
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
# Если порт не настроен, считаем это первым запуском
if settings.get("port") is None:
show_settings = True
except Exception:
# Если файл поврежден или не читается, тоже показываем настройки
show_settings = True
if show_settings:
# Создаем папку Settings, если её нет
os.makedirs("Settings", exist_ok=True)
response = messagebox.askyesno(
"Первый запуск",
"Это первый запуск программы. Хотите настроить параметры подключения сейчас?"
)
if response:
self.open_settings()
# Создание вкладок
def create_tabs(self):
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Создаем вкладки
interactive_frame = ttk.Frame(self.notebook)
file_exec_frame = ttk.Frame(self.notebook)
config_editor_frame = ttk.Frame(self.notebook)
tftp_frame = ttk.Frame(self.notebook)
self.notebook.add(interactive_frame, text="Интерактивный режим")
self.notebook.add(file_exec_frame, text="Выполнение файла")
self.notebook.add(config_editor_frame, text="Редактор конфигурации")
self.notebook.add(tftp_frame, text="TFTP Сервер")
self.create_interactive_tab(interactive_frame)
self.create_file_exec_tab(file_exec_frame)
self.create_config_editor_tab(config_editor_frame)
self.create_tftp_tab(tftp_frame)
# Создание вкладки "Интерактивный режим"
def create_interactive_tab(self, frame):
control_frame = ttk.Frame(frame)
control_frame.pack(fill=X, pady=5)
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
self.interactive_text = CustomText(frame, wrap="word", height=20)
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
input_frame = ttk.Frame(frame)
input_frame.pack(fill=X, pady=5)
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
self.command_entry = CustomEntry(input_frame, width=50)
self.command_entry.pack(side=LEFT, padx=5)
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
# Подключение к устройству
def connect_device(self):
if self.connection:
messagebox.showinfo("Информация", "Уже подключено.")
return
if not self.settings.get("port"):
messagebox.showerror("Ошибка", "COM-порт не выбран!")
return
self.connection = create_connection(self.settings)
if self.connection:
self.append_interactive_text("[INFO] Подключение установлено.\n")
else:
self.append_interactive_text("[ERROR] Не удалось установить соединение.\n")
# Отключение от устройства
def disconnect_device(self):
if self.connection:
try:
self.connection.close()
except Exception:
pass
self.connection = None
self.append_interactive_text("[INFO] Соединение закрыто.\n")
else:
messagebox.showinfo("Информация", "Соединение не установлено.")
# Отправка команды
def send_command(self):
if not self.connection:
messagebox.showerror("Ошибка", "Сначала установите соединение!")
return
cmd = self.command_entry.get().strip()
if not cmd:
return
self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n")
threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start()
# Обработка команды
def process_command(self, cmd):
try:
success, response = send_command_and_process_response(
self.connection,
cmd,
self.settings.get("timeout", 10),
max_attempts=3,
log_callback=self.append_interactive_text,
login=self.settings.get("login"),
password=self.settings.get("password"),
is_gui=True
)
except SerialException as e:
self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n")
logging.error(f"Ошибка отправки команды: {e}", exc_info=True)
except Exception as e:
self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n")
logging.error(f"Неизвестная ошибка: {e}", exc_info=True)
# Добавление текста в текстовое поле
def append_interactive_text(self, text):
append_text_to_widget(self.interactive_text, text)
# Создание вкладки "Выполнить команды из файла"
def create_file_exec_tab(self, frame):
file_frame = ttk.Frame(frame)
file_frame.pack(fill=X, pady=5)
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
self.file_exec_text = CustomText(frame, wrap="word", height=15)
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Выбор файла конфигурации для выполнения команд
def select_config_file_fileexec(self):
select_config_file(self, self.file_exec_var)
# Выполнение команд из файла
def execute_file_commands(self):
if not self.settings.get("port"):
messagebox.showerror("Ошибка", "COM-порт не выбран!")
return
if not self.file_exec_var.get():
messagebox.showerror("Ошибка", "Файл конфигурации не выбран!")
return
if not self.connection:
self.connection = create_connection(self.settings)
if not self.connection:
self.append_file_exec_text("[ERROR] Не удалось установить соединение.\n")
return
threading.Thread(
target=execute_commands_from_file,
args=(
self.connection,
self.file_exec_var.get(),
self.settings.get("timeout", 10),
self.settings.get("copy_mode", "line"),
self.settings.get("block_size", 15),
self.append_file_exec_text,
self.settings.get("login"),
self.settings.get("password"),
True,
),
daemon=True,
).start()
def append_file_exec_text(self, text):
append_text_to_widget(self.file_exec_text, text)
# Создание вкладки "Редактор конфигурационного файла"
def create_config_editor_tab(self, frame):
top_frame = ttk.Frame(frame)
top_frame.pack(fill=X, pady=5)
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
self.config_editor_text = CustomText(frame, wrap="word")
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Выбор файла конфигурации для редактирования
def select_config_file_editor(self):
select_config_file(self, self.editor_file_var, save_to_settings=True)
# Загрузка файла конфигурации
def load_config_file(self):
filename = self.editor_file_var.get()
if not filename or not os.path.exists(filename):
messagebox.showerror("Ошибка", "Файл конфигурации не выбран или не существует.")
return
try:
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
self.config_editor_text.delete("1.0", END)
self.config_editor_text.insert(END, content)
messagebox.showinfo("Информация", "Файл загружен.")
except Exception as e:
logging.error(f"Ошибка загрузки файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}")
# Сохранение файла конфигурации
def save_config_file(self):
filename = self.editor_file_var.get()
if not filename:
messagebox.showerror("Ошибка", "Файл конфигурации не выбран.")
return
try:
content = self.config_editor_text.get("1.0", END)
with open(filename, "w", encoding="utf-8") as f:
f.write(content)
messagebox.showinfo("Информация", "Файл сохранён.")
except Exception as e:
logging.error(f"Ошибка сохранения файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}")
# Открытие окна "О программе"
def open_about(self):
about_window = AboutWindow(self)
about_window.transient(self)
about_window.grab_set()
# Создание вкладки TFTP сервера
def create_tftp_tab(self, frame):
# Создаем фрейм для управления TFTP сервером
tftp_frame = ttk.Frame(frame)
tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Создаем и размещаем элементы управления
controls_frame = ttk.LabelFrame(tftp_frame, text="Управление TFTP сервером")
controls_frame.pack(fill=X, padx=5, pady=5)
# IP адрес
ip_frame = ttk.Frame(controls_frame)
ip_frame.pack(fill=X, padx=5, pady=2)
ttk.Label(ip_frame, text="IP адрес:").pack(side=LEFT, padx=5)
self.tftp_ip_var = StringVar(value="0.0.0.0")
self.tftp_ip_combo = ttk.Combobox(ip_frame, textvariable=self.tftp_ip_var, state="readonly")
self.tftp_ip_combo.pack(side=LEFT, fill=X, expand=True, padx=5)
ttk.Button(ip_frame, text="Обновить", command=self.update_network_adapters).pack(side=LEFT, padx=5)
# Заполняем список адаптеров
self.update_network_adapters()
# Порт
port_frame = ttk.Frame(controls_frame)
port_frame.pack(fill=X, padx=5, pady=2)
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
self.tftp_port_var = StringVar(value="69")
self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var)
self.tftp_port_entry.pack(fill=X, expand=True, padx=5)
# Кнопки управления
buttons_frame = ttk.Frame(controls_frame)
buttons_frame.pack(fill=X, padx=5, pady=5)
self.start_tftp_button = ttk.Button(
buttons_frame,
text="Запустить сервер",
command=self.start_tftp_server
)
self.start_tftp_button.pack(side=LEFT, padx=5)
self.stop_tftp_button = ttk.Button(
buttons_frame,
text="Остановить сервер",
command=self.stop_tftp_server,
state="disabled"
)
self.stop_tftp_button.pack(side=LEFT, padx=5)
# Лог сервера
log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера")
log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
self.tftp_log_text = CustomText(log_frame, wrap=tk.WORD, height=10)
self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Добавляем скроллбар для лога
scrollbar = ttk.Scrollbar(self.tftp_log_text, command=self.tftp_log_text.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.tftp_log_text.config(yscrollcommand=scrollbar.set)
# Статус передач
transfers_frame = ttk.LabelFrame(tftp_frame, text="Активные передачи")
transfers_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Создаем таблицу для отображения активных передач
columns = ("client", "filename", "progress", "remaining", "time")
self.transfers_tree = ttk.Treeview(transfers_frame, columns=columns, show="headings")
# Настраиваем заголовки колонок
self.transfers_tree.heading("client", text="Клиент")
self.transfers_tree.heading("filename", text="Файл")
self.transfers_tree.heading("progress", text="Прогресс")
self.transfers_tree.heading("remaining", text="Осталось")
self.transfers_tree.heading("time", text="Время")
# Настраиваем ширину колонок
self.transfers_tree.column("client", width=120)
self.transfers_tree.column("filename", width=150)
self.transfers_tree.column("progress", width=100)
self.transfers_tree.column("remaining", width=100)
self.transfers_tree.column("time", width=80)
self.transfers_tree.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Инициализация TFTP сервера
self.tftp_server = None
self.tftp_server_thread = None
# Запуск TFTP сервера
def start_tftp_server(self):
try:
# Получаем выбранный IP-адрес
ip = self.tftp_ip_var.get()
if not ip:
messagebox.showerror("Ошибка", "Выберите IP-адрес для TFTP сервера")
return
# Проверяем корректность порта
try:
port = int(self.tftp_port_var.get())
if port <= 0 or port > 65535:
raise ValueError("Порт должен быть в диапазоне 1-65535")
except ValueError as e:
messagebox.showerror("Ошибка", f"Некорректный порт: {str(e)}")
return
# Создаем экземпляр TFTP сервера
self.tftp_server = TFTPServer("Firmware")
# Устанавливаем callback для логирования
def log_callback(message):
# Фильтруем дублирующиеся сообщения о запуске/остановке сервера
if "[INFO] TFTP сервер запущен" in message and hasattr(self, '_server_started'):
return
if "[INFO] TFTP сервер остановлен" in message and hasattr(self, '_server_stopped'):
return
self.append_tftp_log(message)
# Устанавливаем флаги для отслеживания состояния
if "[INFO] TFTP сервер запущен" in message:
self._server_started = True
elif "[INFO] TFTP сервер остановлен" in message:
self._server_stopped = True
# Обновляем информацию о передачах
self.update_transfers_info()
self.tftp_server.set_log_callback(log_callback)
# Запускаем сервер в отдельном потоке
self.tftp_server_thread = threading.Thread(
target=self.run_tftp_server,
args=(ip, port),
daemon=True
)
self.tftp_server_thread.start()
# Обновляем состояние кнопок и элементов управления
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal")
self.tftp_ip_combo.config(state="disabled")
self.tftp_port_entry.config(state="disabled")
# Запускаем периодическое обновление информации о передачах
self.update_transfers_periodically()
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}")
# Запуск TFTP сервера в отдельном потоке
def run_tftp_server(self, ip, port):
try:
self.tftp_server.start_server(ip, port)
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}")
# Остановка TFTP сервера
def stop_tftp_server(self):
if self.tftp_server:
try:
# Отключаем кнопки на время остановки сервера
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="disabled")
# Сбрасываем флаги состояния
if hasattr(self, '_server_started'):
delattr(self, '_server_started')
if hasattr(self, '_server_stopped'):
delattr(self, '_server_stopped')
# Останавливаем сервер
self.tftp_server.stop_server()
# Ждем завершения потока сервера с таймаутом
if self.tftp_server_thread:
self.tftp_server_thread.join(timeout=5.0)
if self.tftp_server_thread.is_alive():
self.append_tftp_log("[WARN] Превышено время ожидания остановки сервера")
# Очищаем ссылки на сервер и поток
self.tftp_server = None
self.tftp_server_thread = None
# Обновляем состояние кнопок
self.start_tftp_button.config(state="normal")
self.stop_tftp_button.config(state="disabled")
self.tftp_ip_combo.config(state="normal")
self.tftp_port_entry.config(state="normal")
# Очищаем таблицу передач
for item in self.transfers_tree.get_children():
self.transfers_tree.delete(item)
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка остановки сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось остановить TFTP сервер: {str(e)}")
# Восстанавливаем состояние кнопок в случае ошибки
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal")
def append_tftp_log(self, text):
append_text_to_widget(self.tftp_log_text, text)
# Обновление информации об активных передачах
def update_transfers_info(self):
if not self.tftp_server:
return
# Очищаем текущие записи
for item in self.transfers_tree.get_children():
self.transfers_tree.delete(item)
# Добавляем информацию о текущих передачах
for client_addr, transfer_info in self.tftp_server.active_transfers.items():
filename = transfer_info['filename']
bytes_sent = transfer_info['bytes_sent']
filesize = transfer_info['filesize']
start_time = transfer_info['start_time']
# Вычисляем прогресс
progress = f"{bytes_sent}/{filesize} байт"
remaining_bytes = filesize - bytes_sent
elapsed_time = time.time() - start_time
# Вычисляем скорость передачи (байт/сек)
if elapsed_time > 0:
transfer_speed = bytes_sent / elapsed_time
# Вычисляем оставшееся время
if transfer_speed > 0:
remaining_time = remaining_bytes / transfer_speed
remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)"
else:
remaining_str = f"{remaining_bytes} байт (неизвестно)"
else:
remaining_str = f"{remaining_bytes} байт (вычисляется...)"
# Добавляем запись в таблицу
self.transfers_tree.insert("", END, values=(
f"{client_addr[0]}:{client_addr[1]}",
filename,
progress,
remaining_str,
f"{elapsed_time:.1f}с"
))
# Периодическое обновление информации о передачах
def update_transfers_periodically(self):
if self.tftp_server and self.tftp_server.running:
self.update_transfers_info()
# Планируем следующее обновление через 1 секунду
self.after(1000, self.update_transfers_periodically)
# Обновление списка сетевых адаптеров
def update_network_adapters(self):
adapters = get_network_adapters()
self.tftp_ip_combo["values"] = adapters
if not self.tftp_ip_var.get() in adapters:
self.tftp_ip_var.set(adapters[0])
# ==========================
# Основной запуск приложения
# ==========================
def main():
setup_logging()
settings = settings_load()
app = SerialAppGUI(settings)
app.mainloop()
# ==========================
# Основной запуск приложения
# ==========================
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("Программа прервана пользователем (KeyboardInterrupt).")
sys.exit(0)
except Exception as e:
logging.critical(f"Неизвестная ошибка: {e}", exc_info=True)
sys.exit(1)