Merge pull request 'v1.0.1' (#4) from v1.0.1 into main

Reviewed-on: #4
This commit was merged in pull request #4.
This commit is contained in:
2025-02-16 22:02:20 +00:00
3 changed files with 331 additions and 328 deletions

View File

@@ -5,11 +5,6 @@
# Это программа для копирования конфигураций на коммутаторы # Это программа для копирования конфигураций на коммутаторы
# ------------------------------------------------------------ # ------------------------------------------------------------
# import argparse Использовался для получения аргументов из командной строки (не используется)
# import platform Использовался для получения списка сетевых адаптеров (не используется)
# import subprocess Использовался для получения списка сетевых адаптеров (не используется)
# import socket не используется
import json import json
import logging import logging
import os import os
@@ -39,12 +34,11 @@ import serial.tools.list_ports
from serial.serialutil import SerialException from serial.serialutil import SerialException
from about_window import AboutWindow from about_window import AboutWindow
from TFTPServer import TFTPServer from TFTPServer import TFTPServer
# from TFTPServer import TFTPServerThread
import socket import socket
from update_checker import UpdateChecker from update_checker import UpdateChecker
# Версия программы # Версия программы
VERSION = "1.0.0" VERSION = "1.0.1"
# Создаем необходимые папки # Создаем необходимые папки
os.makedirs("Logs", exist_ok=True) os.makedirs("Logs", exist_ok=True)
@@ -56,12 +50,8 @@ os.makedirs("docs", exist_ok=True)
# Файл настроек находится в папке Settings # Файл настроек находится в папке Settings
SETTINGS_FILE = os.path.join("Settings", "settings.json") SETTINGS_FILE = os.path.join("Settings", "settings.json")
# ========================== # Настройка логирования с использованием RotatingFileHandler.
# Функции работы с настройками и логированием
# ==========================
def setup_logging(): def setup_logging():
"""Настройка логирования с использованием RotatingFileHandler."""
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
log_path = os.path.join("Logs", "app.log") log_path = os.path.join("Logs", "app.log")
@@ -72,8 +62,8 @@ def setup_logging():
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
# Загрузка настроек из JSON-файла или создание настроек по умолчанию.
def settings_load(): def settings_load():
"""Загрузка настроек из JSON-файла или создание настроек по умолчанию."""
default_settings = { default_settings = {
"port": None, # Порт для подключения "port": None, # Порт для подключения
"baudrate": 9600, # Скорость передачи данных "baudrate": 9600, # Скорость передачи данных
@@ -126,8 +116,8 @@ def settings_load():
logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True) logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True)
return default_settings return default_settings
# Сохранение настроек в JSON-файл
def settings_save(settings): def settings_save(settings):
"""Сохранение настроек в JSON-файл."""
try: try:
with open(SETTINGS_FILE, "w", encoding="utf-8") as f: with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(settings, f, indent=4, ensure_ascii=False) json.dump(settings, f, indent=4, ensure_ascii=False)
@@ -135,18 +125,15 @@ def settings_save(settings):
except Exception as e: except Exception as e:
logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True) logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True)
# Получение списка доступных последовательных портов
def list_serial_ports(): def list_serial_ports():
"""Получение списка доступных последовательных портов.""" """Получение списка доступных последовательных портов."""
ports = serial.tools.list_ports.comports() ports = serial.tools.list_ports.comports()
logging.debug(f"Найдено {len(ports)} серийных портов.") logging.debug(f"Найдено {len(ports)} серийных портов.")
return [port.device for port in ports] return [port.device for port in ports]
# ========================== # Получение списка IP-адресов из сетевых адаптеров
# Функции работы с сетевыми адаптерами (не используются)
# ==========================
def get_network_adapters(): def get_network_adapters():
"""Получение списка сетевых адаптеров и их IP-адресов."""
adapters = [] adapters = []
try: try:
# Получаем имя хоста # Получаем имя хоста
@@ -177,12 +164,8 @@ def get_network_adapters():
return adapters return adapters
# ========================== # Создание соединения с устройством через последовательный порт
# Функции работы с COM-соединением
# ==========================
def create_connection(settings): def create_connection(settings):
"""Создание соединения с устройством через последовательный порт."""
try: try:
conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1) conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1)
logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.") logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.")
@@ -194,12 +177,8 @@ def create_connection(settings):
logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True) logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True)
return None return None
# Проверка наличия логина и пароля в настройках и отправка их на устройство # Проверка наличия логина и пароля в настройках и отправка их на устройство
def send_login_password(serial_connection, login=None, password=None, is_gui=False): def send_login_password(serial_connection, login=None, password=None, is_gui=False):
"""Отправка логина и пароля на устройство."""
if not login: if not login:
if is_gui: if is_gui:
login = simpledialog.askstring("Login", "Введите логин:") login = simpledialog.askstring("Login", "Введите логин:")
@@ -216,14 +195,8 @@ def send_login_password(serial_connection, login=None, password=None, is_gui=Fal
else: else:
password = getpass("Введите пароль: ") password = getpass("Введите пароль: ")
# Чтение ответа от устройства с учётом таймаута. # Чтение ответа от устройства с учётом таймаута.
def read_response(serial_connection, timeout, login=None, password=None, is_gui=False): def read_response(serial_connection, timeout, login=None, password=None, is_gui=False):
"""
Чтение ответа от устройства с учётом таймаута.
Если в последней строке появляется запрос логина или пароля, данные отправляются автоматически.
"""
response = b"" response = b""
end_time = time.time() + timeout end_time = time.time() + timeout
decoded = "" decoded = ""
@@ -260,8 +233,8 @@ def read_response(serial_connection, timeout, login=None, password=None, is_gui=
time.sleep(0.1) time.sleep(0.1)
return decoded return decoded
# Генерация блоков команд для блочного копирования
def generate_command_blocks(lines, block_size): def generate_command_blocks(lines, block_size):
"""Генерация блоков команд для блочного копирования."""
blocks = [] blocks = []
current_block = [] current_block = []
for line in lines: for line in lines:
@@ -287,6 +260,7 @@ def generate_command_blocks(lines, block_size):
blocks.append("\n".join(current_block)) blocks.append("\n".join(current_block))
return blocks return blocks
# Выполнение команд из файла конфигурации
def execute_commands_from_file( def execute_commands_from_file(
serial_connection, serial_connection,
filename, filename,
@@ -298,12 +272,6 @@ def execute_commands_from_file(
password=None, password=None,
is_gui=False, is_gui=False,
): ):
"""
Выполнение команд из файла конфигурации.
Если передан log_callback, вывод будет отображаться в GUI.
Теперь при обнаружении ошибки (например, если в ответе присутствует символ '^')
команда будет отправляться повторно.
"""
try: try:
with open(filename, "r", encoding="utf-8") as file: with open(filename, "r", encoding="utf-8") as file:
lines = [line for line in file if line.strip()] lines = [line for line in file if line.strip()]
@@ -442,18 +410,16 @@ def execute_commands_from_file(
log_callback(msg) log_callback(msg)
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True) logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
# ========================== # Базовый класс для кастомных виджетов с поддержкой контекстного меню и горячих клавиш
# Улучшенные текстовые виджеты class CustomWidgetBase:
# ==========================
def __init__(self):
class CustomText(tk.Text):
"""Улучшенный текстовый виджет с расширенной функциональностью копирования/вставки"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_context_menu() self.create_context_menu()
self.bind_shortcuts() self.bind_shortcuts()
# Создание контекстного меню
def create_context_menu(self): def create_context_menu(self):
"""Создание контекстного меню"""
self.context_menu = tk.Menu(self, tearoff=0) self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut) self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy) self.context_menu.add_command(label="Копировать", command=self.copy)
@@ -462,90 +428,63 @@ class CustomText(tk.Text):
self.context_menu.add_command(label="Выделить всё", command=self.select_all) self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu) self.bind("<Button-3>", self.show_context_menu)
def show_context_menu(self, event): # Привязка горячих клавиш
self.context_menu.post(event.x_root, event.y_root)
def bind_shortcuts(self): def bind_shortcuts(self):
"""Привязка горячих клавиш"""
# Стандартные сочетания # Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>")) self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>")) self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>")) self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all) self.bind("<Control-a>", self.select_all)
# Shift+Insert для вставки # Дополнительные сочетания
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>")) self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
# Ctrl+Insert для копирования
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>")) self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
# Shift+Delete для вырезания
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>")) self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
# Отображение контекстного меню
def show_context_menu(self, event):
"""Отображение контекстного меню"""
self.context_menu.post(event.x_root, event.y_root)
# Вырезание текста
def cut(self): def cut(self):
self.event_generate("<<Cut>>") self.event_generate("<<Cut>>")
# Копирование текста
def copy(self): def copy(self):
self.event_generate("<<Copy>>") self.event_generate("<<Copy>>")
# Вставка текста
def paste(self): def paste(self):
self.event_generate("<<Paste>>") self.event_generate("<<Paste>>")
# Класс для текстового поля с поддержкой контекстного меню и горячих клавиш
class CustomText(tk.Text, CustomWidgetBase):
def __init__(self, *args, **kwargs):
tk.Text.__init__(self, *args, **kwargs)
CustomWidgetBase.__init__(self)
def select_all(self, event=None): def select_all(self, event=None):
"""Выделение всего текста в Text виджете"""
self.tag_add(tk.SEL, "1.0", tk.END) self.tag_add(tk.SEL, "1.0", tk.END)
self.mark_set(tk.INSERT, "1.0") self.mark_set(tk.INSERT, "1.0")
self.see(tk.INSERT) self.see(tk.INSERT)
return "break" return "break"
class CustomEntry(ttk.Entry): # Класс для поля ввода с поддержкой контекстного меню и горячих клавиш
"""Улучшенное поле ввода с расширенной функциональностью копирования/вставки""" class CustomEntry(ttk.Entry, CustomWidgetBase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) ttk.Entry.__init__(self, *args, **kwargs)
self.create_context_menu() CustomWidgetBase.__init__(self)
self.bind_shortcuts()
def create_context_menu(self):
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
def show_context_menu(self, event):
self.context_menu.post(event.x_root, event.y_root)
def bind_shortcuts(self):
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Shift+Insert для вставки
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
# Ctrl+Insert для копирования
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
# Shift+Delete для вырезания
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
def cut(self):
self.event_generate("<<Cut>>")
def copy(self):
self.event_generate("<<Copy>>")
def paste(self):
self.event_generate("<<Paste>>")
def select_all(self, event=None): def select_all(self, event=None):
"""Выделение всего текста в Entry виджете"""
self.select_range(0, tk.END) self.select_range(0, tk.END)
return "break" return "break"
# Класс для окна настроек
class SettingsWindow(tk.Toplevel): class SettingsWindow(tk.Toplevel):
def __init__(self, parent, settings, callback=None): def __init__(self, parent, settings, callback=None):
super().__init__(parent) super().__init__(parent)
@@ -585,15 +524,16 @@ class SettingsWindow(tk.Toplevel):
copy_mode_frame = ttk.Frame(settings_frame) copy_mode_frame = ttk.Frame(settings_frame)
copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5) copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5)
ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line", ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line",
variable=self.copy_mode_var).pack(side=LEFT) variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block", ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block",
variable=self.copy_mode_var).pack(side=LEFT) variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
# Размер блока # Размер блока
ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5) self.block_size_label = ttk.Label(settings_frame, text="Размер блока:")
self.block_size_label.grid(row=4, column=0, sticky=W, pady=5)
self.block_size_var = StringVar(value=str(settings.get("block_size", 15))) self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var) self.block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var)
block_size_entry.grid(row=4, column=1, sticky=W, pady=5) self.block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
# Приглашение командной строки # Приглашение командной строки
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5) ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
@@ -609,9 +549,22 @@ class SettingsWindow(tk.Toplevel):
self.update_ports() self.update_ports()
# Инициализация видимости поля размера блока
self.toggle_block_size()
# Центрируем окно # Центрируем окно
self.center_window() self.center_window()
# Переключение видимости поля размера блока
def toggle_block_size(self):
if self.copy_mode_var.get() == "line":
self.block_size_label.grid_remove()
self.block_size_entry.grid_remove()
else:
self.block_size_label.grid()
self.block_size_entry.grid()
# Центрирование окна
def center_window(self): def center_window(self):
self.update_idletasks() self.update_idletasks()
width = self.winfo_width() width = self.winfo_width()
@@ -620,12 +573,14 @@ class SettingsWindow(tk.Toplevel):
y = (self.winfo_screenheight() // 2) - (height // 2) y = (self.winfo_screenheight() // 2) - (height // 2)
self.geometry(f"{width}x{height}+{x}+{y}") self.geometry(f"{width}x{height}+{x}+{y}")
# Обновление списка доступных последовательных портов
def update_ports(self): def update_ports(self):
ports = list_serial_ports() ports = list_serial_ports()
self.port_combo["values"] = ports self.port_combo["values"] = ports
if ports and not self.port_var.get(): if ports and not self.port_var.get():
self.port_var.set(ports[0]) self.port_var.set(ports[0])
# Сохранение настроек
def save_settings(self): def save_settings(self):
try: try:
self.settings.update({ self.settings.update({
@@ -644,6 +599,78 @@ class SettingsWindow(tk.Toplevel):
except ValueError as e: except ValueError as e:
messagebox.showerror("Ошибка", "Проверьте правильность введенных значений") messagebox.showerror("Ошибка", "Проверьте правильность введенных значений")
# Общая функция для добавления текста в текстовое поле
def append_text_to_widget(widget, text):
# Проверяем, заканчивается ли текст символом новой строки
if not text.endswith('\n'):
text += '\n'
widget.insert(END, text)
widget.see(END)
# Общая функция для выбора файла конфигурации
def select_config_file(self, var, save_to_settings=False):
filename = filedialog.askopenfilename(
title="Выберите файл конфигурации",
filetypes=[("Text files", "*.txt")]
)
if filename:
var.set(filename)
if save_to_settings:
self.settings["config_file"] = filename
settings_save(self.settings)
# Общая функция для отправки команды и обработки ответа
def send_command_and_process_response(
serial_connection,
cmd,
timeout,
max_attempts=3,
log_callback=None,
login=None,
password=None,
is_gui=False
):
attempt = 0
while attempt < max_attempts:
msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
if log_callback:
log_callback(msg)
serial_connection.write((cmd + "\n").encode())
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
if response:
if '^' in response:
msg = (
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{response}\n"
f"Повторная отправка команды...\n"
)
if log_callback:
log_callback(msg)
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
attempt += 1
time.sleep(1)
continue
else:
msg = f"Ответ устройства:\n{response}\n"
if log_callback:
log_callback(msg)
logging.info(f"Ответ устройства:\n{response}")
return True, response
else:
msg = f"Ответ не получен для команды: {cmd}\n"
if log_callback:
log_callback(msg)
logging.warning(f"Нет ответа для команды: {cmd}")
return False, None
msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
if log_callback:
log_callback(msg)
logging.error(msg)
return False, None
# Основной класс для графического интерфейса
class SerialAppGUI(tk.Tk): class SerialAppGUI(tk.Tk):
def __init__(self, settings): def __init__(self, settings):
super().__init__() super().__init__()
@@ -656,7 +683,8 @@ class SerialAppGUI(tk.Tk):
# Инициализация проверки обновлений # Инициализация проверки обновлений
self.update_checker = UpdateChecker( self.update_checker = UpdateChecker(
VERSION, VERSION,
"https://gitea.filow.ru/LowaSC/ComConfigCopy" "https://gitea.filow.ru/LowaSC/ComConfigCopy",
include_prereleases=False # Проверяем только стабильные версии
) )
# Настройка стиля # Настройка стиля
@@ -675,6 +703,7 @@ class SerialAppGUI(tk.Tk):
# Проверка первого запуска # Проверка первого запуска
self.check_first_run() self.check_first_run()
# Создание меню
def create_menu(self): def create_menu(self):
menubar = tk.Menu(self) menubar = tk.Menu(self)
self.config(menu=menubar) self.config(menu=menubar)
@@ -693,8 +722,8 @@ class SerialAppGUI(tk.Tk):
help_menu.add_separator() help_menu.add_separator()
help_menu.add_command(label="О программе", command=self.open_about) help_menu.add_command(label="О программе", command=self.open_about)
# Проверка наличия обновлений
def check_for_updates(self): def check_for_updates(self):
"""Проверка наличия обновлений"""
def on_update_check(update_available, error): def on_update_check(update_available, error):
if error: if error:
messagebox.showerror( messagebox.showerror(
@@ -704,14 +733,20 @@ class SerialAppGUI(tk.Tk):
elif update_available: elif update_available:
release_info = self.update_checker.get_release_notes() release_info = self.update_checker.get_release_notes()
if release_info: if release_info:
# Форматируем сообщение
message = (
f"Доступна новая версия {release_info['version']}!\n\n"
f"Тип релиза: {'Пре-релиз' if release_info['type'] == 'prerelease' else 'Стабильный'}\n\n"
"Изменения:\n"
f"{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?"
)
response = messagebox.askyesno( response = messagebox.askyesno(
"Доступно обновление", "Доступно обновление",
f"Доступна новая версия {release_info['version']}!\n\n" message,
f"Изменения:\n{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?",
) )
if response: if response:
webbrowser.open(release_info["download_url"]) webbrowser.open(release_info["link"])
else: else:
messagebox.showerror( messagebox.showerror(
"Ошибка", "Ошибка",
@@ -725,18 +760,18 @@ class SerialAppGUI(tk.Tk):
self.update_checker.check_updates(callback=on_update_check) self.update_checker.check_updates(callback=on_update_check)
# Обработчик изменения настроек
def on_settings_changed(self): def on_settings_changed(self):
"""Обработчик изменения настроек"""
self.settings = settings_load() self.settings = settings_load()
# Открытие окна настроек
def open_settings(self): def open_settings(self):
"""Открытие окна настроек"""
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed) settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self) settings_window.transient(self)
settings_window.grab_set() settings_window.grab_set()
# Проверка первого запуска программы
def check_first_run(self): def check_first_run(self):
"""Проверка первого запуска программы"""
show_settings = False show_settings = False
# Проверяем существование файла настроек # Проверяем существование файла настроек
@@ -765,6 +800,7 @@ class SerialAppGUI(tk.Tk):
if response: if response:
self.open_settings() self.open_settings()
# Создание вкладок
def create_tabs(self): def create_tabs(self):
self.notebook = ttk.Notebook(self) self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5) self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
@@ -785,7 +821,7 @@ class SerialAppGUI(tk.Tk):
self.create_config_editor_tab(config_editor_frame) self.create_config_editor_tab(config_editor_frame)
self.create_tftp_tab(tftp_frame) self.create_tftp_tab(tftp_frame)
# -------------- Вкладка "Интерактивный режим" -------------- # Создание вкладки "Интерактивный режим"
def create_interactive_tab(self, frame): def create_interactive_tab(self, frame):
control_frame = ttk.Frame(frame) control_frame = ttk.Frame(frame)
control_frame.pack(fill=X, pady=5) control_frame.pack(fill=X, pady=5)
@@ -802,6 +838,7 @@ class SerialAppGUI(tk.Tk):
self.command_entry.pack(side=LEFT, padx=5) self.command_entry.pack(side=LEFT, padx=5)
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5) ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
# Подключение к устройству
def connect_device(self): def connect_device(self):
if self.connection: if self.connection:
messagebox.showinfo("Информация", "Уже подключено.") messagebox.showinfo("Информация", "Уже подключено.")
@@ -815,6 +852,7 @@ class SerialAppGUI(tk.Tk):
else: else:
self.append_interactive_text("[ERROR] Не удалось установить соединение.\n") self.append_interactive_text("[ERROR] Не удалось установить соединение.\n")
# Отключение от устройства
def disconnect_device(self): def disconnect_device(self):
if self.connection: if self.connection:
try: try:
@@ -826,6 +864,7 @@ class SerialAppGUI(tk.Tk):
else: else:
messagebox.showinfo("Информация", "Соединение не установлено.") messagebox.showinfo("Информация", "Соединение не установлено.")
# Отправка команды
def send_command(self): def send_command(self):
if not self.connection: if not self.connection:
messagebox.showerror("Ошибка", "Сначала установите соединение!") messagebox.showerror("Ошибка", "Сначала установите соединение!")
@@ -836,42 +875,19 @@ class SerialAppGUI(tk.Tk):
self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n") self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n")
threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start() threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start()
# Обработка команды
def process_command(self, cmd): def process_command(self, cmd):
try: try:
max_attempts = 3 success, response = send_command_and_process_response(
attempt = 0 self.connection,
while attempt < max_attempts: cmd,
self.connection.write((cmd + "\n").encode()) self.settings.get("timeout", 10),
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") max_attempts=3,
response = read_response( log_callback=self.append_interactive_text,
self.connection, login=self.settings.get("login"),
self.settings.get("timeout", 10), password=self.settings.get("password"),
login=self.settings.get("login"), is_gui=True
password=self.settings.get("password"), )
is_gui=True,
)
if response:
if '^' in response:
self.append_interactive_text(
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{response}\n"
f"Повторная отправка команды...\n"
)
logging.warning(f"Ошибка в команде: {cmd}. Попытка повторной отправки.")
attempt += 1
time.sleep(1)
continue
else:
self.append_interactive_text(f"[INFO] Ответ устройства:\n{response}\n")
logging.info(f"Получен ответ:\n{response}")
break
else:
self.append_interactive_text("[WARN] Ответ не получен.\n")
logging.warning("Нет ответа от устройства в течение таймаута.")
break
if attempt == max_attempts:
self.append_interactive_text(f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n")
logging.error(f"Команда не выполнена корректно после {max_attempts} попыток: {cmd}")
except SerialException as e: except SerialException as e:
self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n") self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n")
logging.error(f"Ошибка отправки команды: {e}", exc_info=True) logging.error(f"Ошибка отправки команды: {e}", exc_info=True)
@@ -879,11 +895,11 @@ class SerialAppGUI(tk.Tk):
self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n") self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n")
logging.error(f"Неизвестная ошибка: {e}", exc_info=True) logging.error(f"Неизвестная ошибка: {e}", exc_info=True)
# Добавление текста в текстовое поле
def append_interactive_text(self, text): def append_interactive_text(self, text):
self.interactive_text.insert(END, text) append_text_to_widget(self.interactive_text, text)
self.interactive_text.see(END)
# -------------- Вкладка "Выполнить команды из файла" -------------- # Создание вкладки "Выполнить команды из файла"
def create_file_exec_tab(self, frame): def create_file_exec_tab(self, frame):
file_frame = ttk.Frame(frame) file_frame = ttk.Frame(frame)
file_frame.pack(fill=X, pady=5) file_frame.pack(fill=X, pady=5)
@@ -895,11 +911,11 @@ class SerialAppGUI(tk.Tk):
self.file_exec_text = CustomText(frame, wrap="word", height=15) self.file_exec_text = CustomText(frame, wrap="word", height=15)
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5) self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Выбор файла конфигурации для выполнения команд
def select_config_file_fileexec(self): def select_config_file_fileexec(self):
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")]) select_config_file(self, self.file_exec_var)
if filename:
self.file_exec_var.set(filename)
# Выполнение команд из файла
def execute_file_commands(self): def execute_file_commands(self):
if not self.settings.get("port"): if not self.settings.get("port"):
messagebox.showerror("Ошибка", "COM-порт не выбран!") messagebox.showerror("Ошибка", "COM-порт не выбран!")
@@ -929,10 +945,9 @@ class SerialAppGUI(tk.Tk):
).start() ).start()
def append_file_exec_text(self, text): def append_file_exec_text(self, text):
self.file_exec_text.insert(END, text) append_text_to_widget(self.file_exec_text, text)
self.file_exec_text.see(END)
# -------------- Вкладка "Редактор конфигурационного файла" -------------- # Создание вкладки "Редактор конфигурационного файла"
def create_config_editor_tab(self, frame): def create_config_editor_tab(self, frame):
top_frame = ttk.Frame(frame) top_frame = ttk.Frame(frame)
top_frame.pack(fill=X, pady=5) top_frame.pack(fill=X, pady=5)
@@ -945,13 +960,11 @@ class SerialAppGUI(tk.Tk):
self.config_editor_text = CustomText(frame, wrap="word") self.config_editor_text = CustomText(frame, wrap="word")
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5) self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Выбор файла конфигурации для редактирования
def select_config_file_editor(self): def select_config_file_editor(self):
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")]) select_config_file(self, self.editor_file_var, save_to_settings=True)
if filename:
self.editor_file_var.set(filename)
self.settings["config_file"] = filename
settings_save(self.settings)
# Загрузка файла конфигурации
def load_config_file(self): def load_config_file(self):
filename = self.editor_file_var.get() filename = self.editor_file_var.get()
if not filename or not os.path.exists(filename): if not filename or not os.path.exists(filename):
@@ -967,6 +980,7 @@ class SerialAppGUI(tk.Tk):
logging.error(f"Ошибка загрузки файла: {e}", exc_info=True) logging.error(f"Ошибка загрузки файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}") messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}")
# Сохранение файла конфигурации
def save_config_file(self): def save_config_file(self):
filename = self.editor_file_var.get() filename = self.editor_file_var.get()
if not filename: if not filename:
@@ -981,13 +995,14 @@ class SerialAppGUI(tk.Tk):
logging.error(f"Ошибка сохранения файла: {e}", exc_info=True) logging.error(f"Ошибка сохранения файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}") messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}")
# Открытие окна "О программе"
def open_about(self): def open_about(self):
about_window = AboutWindow(self) about_window = AboutWindow(self)
about_window.transient(self) about_window.transient(self)
about_window.grab_set() about_window.grab_set()
# Создание вкладки TFTP сервера
def create_tftp_tab(self, frame): def create_tftp_tab(self, frame):
"""Создание вкладки TFTP сервера."""
# Создаем фрейм для управления TFTP сервером # Создаем фрейм для управления TFTP сервером
tftp_frame = ttk.Frame(frame) tftp_frame = ttk.Frame(frame)
tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
@@ -1075,8 +1090,8 @@ class SerialAppGUI(tk.Tk):
self.tftp_server = None self.tftp_server = None
self.tftp_server_thread = None self.tftp_server_thread = None
# Запуск TFTP сервера
def start_tftp_server(self): def start_tftp_server(self):
"""Запуск TFTP сервера."""
try: try:
# Получаем выбранный IP-адрес # Получаем выбранный IP-адрес
ip = self.tftp_ip_var.get() ip = self.tftp_ip_var.get()
@@ -1138,15 +1153,15 @@ class SerialAppGUI(tk.Tk):
self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}") self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}") messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}")
# Запуск TFTP сервера в отдельном потоке
def run_tftp_server(self, ip, port): def run_tftp_server(self, ip, port):
"""Запуск TFTP сервера в отдельном потоке."""
try: try:
self.tftp_server.start_server(ip, port) self.tftp_server.start_server(ip, port)
except Exception as e: except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}") self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}")
# Остановка TFTP сервера
def stop_tftp_server(self): def stop_tftp_server(self):
"""Остановка TFTP сервера."""
if self.tftp_server: if self.tftp_server:
try: try:
# Отключаем кнопки на время остановки сервера # Отключаем кнопки на время остановки сервера
@@ -1190,13 +1205,11 @@ class SerialAppGUI(tk.Tk):
self.start_tftp_button.config(state="disabled") self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal") self.stop_tftp_button.config(state="normal")
def append_tftp_log(self, message): def append_tftp_log(self, text):
"""Добавление сообщения в лог TFTP сервера.""" append_text_to_widget(self.tftp_log_text, text)
self.tftp_log_text.insert(END, message + "\n")
self.tftp_log_text.see(END)
# Обновление информации об активных передачах
def update_transfers_info(self): def update_transfers_info(self):
"""Обновление информации об активных передачах."""
if not self.tftp_server: if not self.tftp_server:
return return
@@ -1237,15 +1250,15 @@ class SerialAppGUI(tk.Tk):
f"{elapsed_time:.1f}с" f"{elapsed_time:.1f}с"
)) ))
# Периодическое обновление информации о передачах
def update_transfers_periodically(self): def update_transfers_periodically(self):
"""Периодическое обновление информации о передачах."""
if self.tftp_server and self.tftp_server.running: if self.tftp_server and self.tftp_server.running:
self.update_transfers_info() self.update_transfers_info()
# Планируем следующее обновление через 1 секунду # Планируем следующее обновление через 1 секунду
self.after(1000, self.update_transfers_periodically) self.after(1000, self.update_transfers_periodically)
# Обновление списка сетевых адаптеров
def update_network_adapters(self): def update_network_adapters(self):
"""Обновление списка сетевых адаптеров."""
adapters = get_network_adapters() adapters = get_network_adapters()
self.tftp_ip_combo["values"] = adapters self.tftp_ip_combo["values"] = adapters
if not self.tftp_ip_var.get() in adapters: if not self.tftp_ip_var.get() in adapters:

View File

@@ -69,7 +69,7 @@ class AboutWindow(tk.Toplevel):
ttk.Label( ttk.Label(
contact_frame, contact_frame,
text="Email: LowaWorkMail@gmail.com" text="Email: SPRF555@gmail.com"
).pack(anchor="w") ).pack(anchor="w")
telegram_link = ttk.Label( telegram_link = ttk.Label(

View File

@@ -1,175 +1,165 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json
import logging import logging
import requests import requests
import threading import threading
import re
from packaging import version from packaging import version
import xml.etree.ElementTree as ET
import html
class UpdateCheckError(Exception): class UpdateCheckError(Exception):
"""Исключение для ошибок проверки обновлений""" """Исключение для ошибок проверки обновлений"""
pass pass
class ReleaseType:
"""Типы релизов"""
STABLE = "stable"
PRERELEASE = "prerelease"
class UpdateChecker: class UpdateChecker:
"""Класс для проверки обновлений программы""" """Класс для проверки обновлений программы"""
def __init__(self, current_version, repo_url): def __init__(self, current_version, repo_url, include_prereleases=False):
self.current_version = current_version self.current_version = current_version
self.repo_url = repo_url self.repo_url = repo_url
# Формируем базовый URL API self.include_prereleases = include_prereleases
self.api_url = repo_url.replace("gitea.filow.ru", "gitea.filow.ru/api/v1/repos/LowaSC/ComConfigCopy") self.rss_url = f"{repo_url}/releases.rss"
self._update_available = False self.release_info = None
self._latest_version = None
self._latest_release = None def _clean_html(self, html_text):
self._error = None """Очищает HTML-разметку и форматирует текст"""
self._changelog = None if not html_text:
return ""
def get_changelog(self, callback=None): text = re.sub(r'<[^>]+>', '', html_text)
""" text = html.unescape(text)
Получение changelog из репозитория. text = re.sub(r'\n\s*\n', '\n\n', text)
:param callback: Функция обратного вызова, которая будет вызвана после получения changelog return '\n'.join(line.strip() for line in text.splitlines()).strip()
"""
def fetch(): def _parse_release_info(self, item):
try: """Извлекает информацию о релизе из RSS item"""
# Пытаемся получить CHANGELOG.md из репозитория title = item.find('title').text if item.find('title') is not None else ''
response = requests.get(f"{self.api_url}/contents/CHANGELOG.md", timeout=10) link = item.find('link').text if item.find('link') is not None else ''
response.raise_for_status() description = item.find('description').text if item.find('description') is not None else ''
content = item.find('{http://purl.org/rss/1.0/modules/content/}encoded')
content = response.json() content_text = content.text if content is not None else ''
if "content" in content:
import base64
changelog_content = base64.b64decode(content["content"]).decode("utf-8")
self._changelog = changelog_content
self._error = None
else:
raise UpdateCheckError("Не удалось получить содержимое CHANGELOG.md")
except requests.RequestException as e:
error_msg = f"Ошибка получения changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
except Exception as e:
error_msg = f"Неизвестная ошибка при получении changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
finally:
if callback:
callback(self._changelog, self._error)
# Запускаем получение в отдельном потоке # Извлекаем версию и проверяем тип релиза из тега
threading.Thread(target=fetch, daemon=True).start() version_match = re.search(r'/releases/tag/(?:pre-)?v?(\d+\.\d+(?:\.\d+)?)', link)
if not version_match:
return None
version_str = version_match.group(1)
# Проверяем наличие префикса pre- в теге
is_prerelease = 'pre-' in link.lower()
# Форматируем название релиза
formatted_title = title
if title == version_str or not title.strip():
# Если заголовок пустой или совпадает с версией, создаем стандартное название
release_type = "Пре-релиз" if is_prerelease else "Версия"
formatted_title = f"{release_type} {version_str}"
elif not re.search(version_str, title):
# Если версия не указана в заголовке, добавляем её
formatted_title = f"{title} ({version_str})"
# Форматируем описание
formatted_description = self._clean_html(content_text or description)
if not formatted_description.strip():
formatted_description = "Нет описания"
# Добавляем метку типа релиза в начало описания
release_type_label = "[Пре-релиз] " if is_prerelease else ""
formatted_description = f"{release_type_label}{formatted_description}"
return {
'title': formatted_title,
'link': link,
'description': formatted_description,
'version': version_str,
'type': ReleaseType.PRERELEASE if is_prerelease else ReleaseType.STABLE
}
def check_updates(self, callback=None): def check_updates(self, callback=None):
""" """Проверяет наличие обновлений в асинхронном режиме"""
Проверка наличия обновлений. def check_worker():
:param callback: Функция обратного вызова, которая будет вызвана после проверки
"""
def check():
try: try:
response = requests.get(f"{self.api_url}/releases", timeout=10) logging.info(f"Текущая версия программы: {self.current_version}")
logging.info(f"Проверка пре-релизов: {self.include_prereleases}")
logging.info(f"Запрос RSS ленты: {self.rss_url}")
response = requests.get(self.rss_url, timeout=10)
response.raise_for_status() response.raise_for_status()
releases = response.json() root = ET.fromstring(response.content)
if not releases: items = root.findall('.//item')
raise UpdateCheckError("Не найдено релизов в репозитории") if not items:
raise UpdateCheckError("Релизы не найдены")
latest_release = releases[0] logging.info(f"Найдено {len(items)} релизов")
latest_version = latest_release.get("tag_name", "").lstrip("v")
if not latest_version: latest_version = None
raise UpdateCheckError("Не удалось определить версию последнего релиза") latest_info = None
try: for item in items:
if version.parse(latest_version) > version.parse(self.current_version): release_info = self._parse_release_info(item)
self._update_available = True if not release_info:
self._latest_version = latest_version continue
self._latest_release = latest_release
logging.info(f"Доступно обновление: {latest_version}") is_prerelease = release_info['type'] == ReleaseType.PRERELEASE
else: logging.info(
logging.info("Обновления не требуются") f"Проверка релиза: {release_info['title']}, "
except version.InvalidVersion as e: f"версия: {release_info['version']}, "
raise UpdateCheckError(f"Некорректный формат версии: {e}") f"тип: {'пре-релиз' if is_prerelease else 'стабильная'}"
)
self._error = None # Пропускаем пре-релизы если они не включены
if is_prerelease and not self.include_prereleases:
logging.info(f"Пропуск пре-релиза: {release_info['version']}")
continue
except requests.RequestException as e: # Сравниваем версии
error_msg = f"Ошибка сетевого подключения: {e}" try:
logging.error(error_msg, exc_info=True) current_ver = version.parse(latest_version or "0.0.0")
self._error = error_msg new_ver = version.parse(release_info['version'].split('-')[0]) # Убираем суффикс для сравнения
except UpdateCheckError as e:
logging.error(str(e), exc_info=True) if new_ver > current_ver:
self._error = str(e) latest_version = release_info['version']
except Exception as e: latest_info = release_info
error_msg = f"Неизвестная ошибка при проверке обновлений: {e}" logging.info(f"Новая версия: {latest_version}")
logging.error(error_msg, exc_info=True)
self._error = error_msg except version.InvalidVersion as e:
finally: logging.warning(f"Некорректный формат версии {release_info['version']}: {e}")
continue
if not latest_info:
raise UpdateCheckError("Не найдены подходящие версии")
self.release_info = latest_info
# Сравниваем с текущей версией
current_ver = version.parse(self.current_version)
latest_ver = version.parse(latest_version.split('-')[0])
update_available = latest_ver > current_ver
logging.info(f"Сравнение версий: текущая {current_ver} <-> последняя {latest_ver}")
logging.info(f"Доступно обновление: {update_available}")
if callback: if callback:
callback(self._update_available, self._error) callback(update_available, None)
@property except UpdateCheckError as e:
def update_available(self): logging.error(str(e))
"""Доступно ли обновление""" if callback:
return self._update_available callback(False, str(e))
except Exception as e:
@property logging.error(f"Ошибка при проверке обновлений: {e}", exc_info=True)
def latest_version(self): if callback:
"""Последняя доступная версия""" callback(False, str(e))
return self._latest_version
threading.Thread(target=check_worker, daemon=True).start()
@property
def error(self):
"""Последняя ошибка при проверке обновлений"""
return self._error
@property
def changelog(self):
"""Текущий changelog"""
return self._changelog
def get_release_notes(self): def get_release_notes(self):
"""Получение информации о последнем релизе""" """Возвращает информацию о последнем релизе"""
if self._latest_release: return self.release_info
return {
"version": self._latest_version,
"description": self._latest_release.get("body", ""),
"download_url": self._latest_release.get("assets", [{}])[0].get("browser_download_url", "")
}
return None
def get_releases(self, callback=None):
"""
Получение списка релизов из репозитория.
:param callback: Функция обратного вызова, которая будет вызвана после получения списка релизов
"""
def fetch():
try:
response = requests.get(f"{self.api_url}/releases", timeout=10)
response.raise_for_status()
releases = response.json()
if not releases:
raise UpdateCheckError("Не найдено релизов в репозитории")
self._error = None
if callback:
callback(releases, None)
except requests.RequestException as e:
error_msg = f"Ошибка сетевого подключения: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
except Exception as e:
error_msg = f"Ошибка при получении списка релизов: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
# Запускаем получение в отдельном потоке
threading.Thread(target=fetch, daemon=True).start()