diff --git a/ComConfigCopy.py b/ComConfigCopy.py index a679b2c..3c50979 100644 --- a/ComConfigCopy.py +++ b/ComConfigCopy.py @@ -5,11 +5,6 @@ # Это программа для копирования конфигураций на коммутаторы # ------------------------------------------------------------ - -# import argparse Использовался для получения аргументов из командной строки (не используется) -# import platform Использовался для получения списка сетевых адаптеров (не используется) -# import subprocess Использовался для получения списка сетевых адаптеров (не используется) -# import socket не используется import json import logging import os @@ -39,12 +34,11 @@ import serial.tools.list_ports from serial.serialutil import SerialException from about_window import AboutWindow from TFTPServer import TFTPServer -# from TFTPServer import TFTPServerThread import socket from update_checker import UpdateChecker # Версия программы -VERSION = "1.0.0" +VERSION = "1.0.1" # Создаем необходимые папки os.makedirs("Logs", exist_ok=True) @@ -56,12 +50,8 @@ os.makedirs("docs", exist_ok=True) # Файл настроек находится в папке Settings SETTINGS_FILE = os.path.join("Settings", "settings.json") -# ========================== -# Функции работы с настройками и логированием -# ========================== - +# Настройка логирования с использованием RotatingFileHandler. def setup_logging(): - """Настройка логирования с использованием RotatingFileHandler.""" logger = logging.getLogger() logger.setLevel(logging.DEBUG) log_path = os.path.join("Logs", "app.log") @@ -72,8 +62,8 @@ def setup_logging(): handler.setFormatter(formatter) logger.addHandler(handler) +# Загрузка настроек из JSON-файла или создание настроек по умолчанию. def settings_load(): - """Загрузка настроек из JSON-файла или создание настроек по умолчанию.""" default_settings = { "port": None, # Порт для подключения "baudrate": 9600, # Скорость передачи данных @@ -126,8 +116,8 @@ def settings_load(): logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True) return default_settings +# Сохранение настроек в JSON-файл def settings_save(settings): - """Сохранение настроек в JSON-файл.""" try: with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(settings, f, indent=4, ensure_ascii=False) @@ -135,18 +125,15 @@ def settings_save(settings): except Exception as e: logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True) +# Получение списка доступных последовательных портов def list_serial_ports(): """Получение списка доступных последовательных портов.""" ports = serial.tools.list_ports.comports() logging.debug(f"Найдено {len(ports)} серийных портов.") return [port.device for port in ports] -# ========================== -# Функции работы с сетевыми адаптерами (не используются) -# ========================== - +# Получение списка IP-адресов из сетевых адаптеров def get_network_adapters(): - """Получение списка сетевых адаптеров и их IP-адресов.""" adapters = [] try: # Получаем имя хоста @@ -177,12 +164,8 @@ def get_network_adapters(): return adapters -# ========================== -# Функции работы с COM-соединением -# ========================== - +# Создание соединения с устройством через последовательный порт def create_connection(settings): - """Создание соединения с устройством через последовательный порт.""" try: conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1) logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.") @@ -194,12 +177,8 @@ def create_connection(settings): logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True) return None - - # Проверка наличия логина и пароля в настройках и отправка их на устройство - def send_login_password(serial_connection, login=None, password=None, is_gui=False): - """Отправка логина и пароля на устройство.""" if not login: if is_gui: login = simpledialog.askstring("Login", "Введите логин:") @@ -216,14 +195,8 @@ def send_login_password(serial_connection, login=None, password=None, is_gui=Fal else: password = getpass("Введите пароль: ") - # Чтение ответа от устройства с учётом таймаута. - def read_response(serial_connection, timeout, login=None, password=None, is_gui=False): - """ - Чтение ответа от устройства с учётом таймаута. - Если в последней строке появляется запрос логина или пароля, данные отправляются автоматически. - """ response = b"" end_time = time.time() + timeout decoded = "" @@ -260,8 +233,8 @@ def read_response(serial_connection, timeout, login=None, password=None, is_gui= time.sleep(0.1) return decoded +# Генерация блоков команд для блочного копирования def generate_command_blocks(lines, block_size): - """Генерация блоков команд для блочного копирования.""" blocks = [] current_block = [] for line in lines: @@ -287,6 +260,7 @@ def generate_command_blocks(lines, block_size): blocks.append("\n".join(current_block)) return blocks +# Выполнение команд из файла конфигурации def execute_commands_from_file( serial_connection, filename, @@ -298,12 +272,6 @@ def execute_commands_from_file( password=None, is_gui=False, ): - """ - Выполнение команд из файла конфигурации. - Если передан log_callback, вывод будет отображаться в GUI. - Теперь при обнаружении ошибки (например, если в ответе присутствует символ '^') - команда будет отправляться повторно. - """ try: with open(filename, "r", encoding="utf-8") as file: lines = [line for line in file if line.strip()] @@ -442,18 +410,16 @@ def execute_commands_from_file( log_callback(msg) logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True) -# ========================== -# Улучшенные текстовые виджеты -# ========================== - -class CustomText(tk.Text): - """Улучшенный текстовый виджет с расширенной функциональностью копирования/вставки""" - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) +# Базовый класс для кастомных виджетов с поддержкой контекстного меню и горячих клавиш +class CustomWidgetBase: + + def __init__(self): self.create_context_menu() self.bind_shortcuts() - + + # Создание контекстного меню def create_context_menu(self): + """Создание контекстного меню""" self.context_menu = tk.Menu(self, tearoff=0) self.context_menu.add_command(label="Вырезать", command=self.cut) self.context_menu.add_command(label="Копировать", command=self.copy) @@ -462,90 +428,63 @@ class CustomText(tk.Text): self.context_menu.add_command(label="Выделить всё", command=self.select_all) self.bind("", self.show_context_menu) - - def show_context_menu(self, event): - self.context_menu.post(event.x_root, event.y_root) - + + # Привязка горячих клавиш def bind_shortcuts(self): + """Привязка горячих клавиш""" # Стандартные сочетания self.bind("", lambda e: self.event_generate("<>")) self.bind("", lambda e: self.event_generate("<>")) self.bind("", lambda e: self.event_generate("<>")) self.bind("", self.select_all) - # Shift+Insert для вставки + # Дополнительные сочетания self.bind("", lambda e: self.event_generate("<>")) - - # Ctrl+Insert для копирования self.bind("", lambda e: self.event_generate("<>")) - - # Shift+Delete для вырезания self.bind("", lambda e: self.event_generate("<>")) - + + # Отображение контекстного меню + def show_context_menu(self, event): + """Отображение контекстного меню""" + self.context_menu.post(event.x_root, event.y_root) + + # Вырезание текста def cut(self): self.event_generate("<>") - + + # Копирование текста def copy(self): self.event_generate("<>") - + + # Вставка текста def paste(self): self.event_generate("<>") - + +# Класс для текстового поля с поддержкой контекстного меню и горячих клавиш +class CustomText(tk.Text, CustomWidgetBase): + def __init__(self, *args, **kwargs): + tk.Text.__init__(self, *args, **kwargs) + CustomWidgetBase.__init__(self) + def select_all(self, event=None): + """Выделение всего текста в Text виджете""" self.tag_add(tk.SEL, "1.0", tk.END) self.mark_set(tk.INSERT, "1.0") self.see(tk.INSERT) return "break" -class CustomEntry(ttk.Entry): - """Улучшенное поле ввода с расширенной функциональностью копирования/вставки""" +# Класс для поля ввода с поддержкой контекстного меню и горячих клавиш +class CustomEntry(ttk.Entry, CustomWidgetBase): def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.create_context_menu() - self.bind_shortcuts() - - def create_context_menu(self): - self.context_menu = tk.Menu(self, tearoff=0) - self.context_menu.add_command(label="Вырезать", command=self.cut) - self.context_menu.add_command(label="Копировать", command=self.copy) - self.context_menu.add_command(label="Вставить", command=self.paste) - self.context_menu.add_separator() - self.context_menu.add_command(label="Выделить всё", command=self.select_all) - - self.bind("", self.show_context_menu) - - def show_context_menu(self, event): - self.context_menu.post(event.x_root, event.y_root) - - def bind_shortcuts(self): - # Стандартные сочетания - self.bind("", lambda e: self.event_generate("<>")) - self.bind("", lambda e: self.event_generate("<>")) - self.bind("", lambda e: self.event_generate("<>")) - self.bind("", self.select_all) - - # Shift+Insert для вставки - self.bind("", lambda e: self.event_generate("<>")) - - # Ctrl+Insert для копирования - self.bind("", lambda e: self.event_generate("<>")) - - # Shift+Delete для вырезания - self.bind("", lambda e: self.event_generate("<>")) - - def cut(self): - self.event_generate("<>") - - def copy(self): - self.event_generate("<>") - - def paste(self): - self.event_generate("<>") - + ttk.Entry.__init__(self, *args, **kwargs) + CustomWidgetBase.__init__(self) + def select_all(self, event=None): + """Выделение всего текста в Entry виджете""" self.select_range(0, tk.END) return "break" +# Класс для окна настроек class SettingsWindow(tk.Toplevel): def __init__(self, parent, settings, callback=None): super().__init__(parent) @@ -585,15 +524,16 @@ class SettingsWindow(tk.Toplevel): copy_mode_frame = ttk.Frame(settings_frame) copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5) ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line", - variable=self.copy_mode_var).pack(side=LEFT) + variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT) ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block", - variable=self.copy_mode_var).pack(side=LEFT) + variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT) # Размер блока - ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5) + self.block_size_label = ttk.Label(settings_frame, text="Размер блока:") + self.block_size_label.grid(row=4, column=0, sticky=W, pady=5) self.block_size_var = StringVar(value=str(settings.get("block_size", 15))) - block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var) - block_size_entry.grid(row=4, column=1, sticky=W, pady=5) + self.block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var) + self.block_size_entry.grid(row=4, column=1, sticky=W, pady=5) # Приглашение командной строки ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5) @@ -609,9 +549,22 @@ class SettingsWindow(tk.Toplevel): self.update_ports() + # Инициализация видимости поля размера блока + self.toggle_block_size() + # Центрируем окно self.center_window() - + + # Переключение видимости поля размера блока + def toggle_block_size(self): + if self.copy_mode_var.get() == "line": + self.block_size_label.grid_remove() + self.block_size_entry.grid_remove() + else: + self.block_size_label.grid() + self.block_size_entry.grid() + + # Центрирование окна def center_window(self): self.update_idletasks() width = self.winfo_width() @@ -620,12 +573,14 @@ class SettingsWindow(tk.Toplevel): y = (self.winfo_screenheight() // 2) - (height // 2) self.geometry(f"{width}x{height}+{x}+{y}") + # Обновление списка доступных последовательных портов def update_ports(self): ports = list_serial_ports() self.port_combo["values"] = ports if ports and not self.port_var.get(): self.port_var.set(ports[0]) + # Сохранение настроек def save_settings(self): try: self.settings.update({ @@ -644,6 +599,78 @@ class SettingsWindow(tk.Toplevel): except ValueError as e: messagebox.showerror("Ошибка", "Проверьте правильность введенных значений") +# Общая функция для добавления текста в текстовое поле +def append_text_to_widget(widget, text): + # Проверяем, заканчивается ли текст символом новой строки + if not text.endswith('\n'): + text += '\n' + widget.insert(END, text) + widget.see(END) + +# Общая функция для выбора файла конфигурации +def select_config_file(self, var, save_to_settings=False): + filename = filedialog.askopenfilename( + title="Выберите файл конфигурации", + filetypes=[("Text files", "*.txt")] + ) + if filename: + var.set(filename) + if save_to_settings: + self.settings["config_file"] = filename + settings_save(self.settings) + +# Общая функция для отправки команды и обработки ответа +def send_command_and_process_response( + serial_connection, + cmd, + timeout, + max_attempts=3, + log_callback=None, + login=None, + password=None, + is_gui=False +): + attempt = 0 + while attempt < max_attempts: + msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n" + if log_callback: + log_callback(msg) + serial_connection.write((cmd + "\n").encode()) + logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") + response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) + if response: + if '^' in response: + msg = ( + f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" + f"Ответ устройства:\n{response}\n" + f"Повторная отправка команды...\n" + ) + if log_callback: + log_callback(msg) + logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.") + attempt += 1 + time.sleep(1) + continue + else: + msg = f"Ответ устройства:\n{response}\n" + if log_callback: + log_callback(msg) + logging.info(f"Ответ устройства:\n{response}") + return True, response + else: + msg = f"Ответ не получен для команды: {cmd}\n" + if log_callback: + log_callback(msg) + logging.warning(f"Нет ответа для команды: {cmd}") + return False, None + + msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n" + if log_callback: + log_callback(msg) + logging.error(msg) + return False, None + +# Основной класс для графического интерфейса class SerialAppGUI(tk.Tk): def __init__(self, settings): super().__init__() @@ -656,7 +683,8 @@ class SerialAppGUI(tk.Tk): # Инициализация проверки обновлений self.update_checker = UpdateChecker( VERSION, - "https://gitea.filow.ru/LowaSC/ComConfigCopy" + "https://gitea.filow.ru/LowaSC/ComConfigCopy", + include_prereleases=False # Проверяем только стабильные версии ) # Настройка стиля @@ -675,6 +703,7 @@ class SerialAppGUI(tk.Tk): # Проверка первого запуска self.check_first_run() + # Создание меню def create_menu(self): menubar = tk.Menu(self) self.config(menu=menubar) @@ -693,8 +722,8 @@ class SerialAppGUI(tk.Tk): help_menu.add_separator() help_menu.add_command(label="О программе", command=self.open_about) + # Проверка наличия обновлений def check_for_updates(self): - """Проверка наличия обновлений""" def on_update_check(update_available, error): if error: messagebox.showerror( @@ -704,14 +733,20 @@ class SerialAppGUI(tk.Tk): elif update_available: release_info = self.update_checker.get_release_notes() if release_info: + # Форматируем сообщение + message = ( + f"Доступна новая версия {release_info['version']}!\n\n" + f"Тип релиза: {'Пре-релиз' if release_info['type'] == 'prerelease' else 'Стабильный'}\n\n" + "Изменения:\n" + f"{release_info['description']}\n\n" + "Хотите перейти на страницу загрузки?" + ) response = messagebox.askyesno( "Доступно обновление", - f"Доступна новая версия {release_info['version']}!\n\n" - f"Изменения:\n{release_info['description']}\n\n" - "Хотите перейти на страницу загрузки?", + message, ) if response: - webbrowser.open(release_info["download_url"]) + webbrowser.open(release_info["link"]) else: messagebox.showerror( "Ошибка", @@ -725,18 +760,18 @@ class SerialAppGUI(tk.Tk): self.update_checker.check_updates(callback=on_update_check) + # Обработчик изменения настроек def on_settings_changed(self): - """Обработчик изменения настроек""" self.settings = settings_load() + # Открытие окна настроек def open_settings(self): - """Открытие окна настроек""" settings_window = SettingsWindow(self, self.settings, self.on_settings_changed) settings_window.transient(self) settings_window.grab_set() + # Проверка первого запуска программы def check_first_run(self): - """Проверка первого запуска программы""" show_settings = False # Проверяем существование файла настроек @@ -765,6 +800,7 @@ class SerialAppGUI(tk.Tk): if response: self.open_settings() + # Создание вкладок def create_tabs(self): self.notebook = ttk.Notebook(self) self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5) @@ -785,7 +821,7 @@ class SerialAppGUI(tk.Tk): self.create_config_editor_tab(config_editor_frame) self.create_tftp_tab(tftp_frame) - # -------------- Вкладка "Интерактивный режим" -------------- + # Создание вкладки "Интерактивный режим" def create_interactive_tab(self, frame): control_frame = ttk.Frame(frame) control_frame.pack(fill=X, pady=5) @@ -802,6 +838,7 @@ class SerialAppGUI(tk.Tk): self.command_entry.pack(side=LEFT, padx=5) ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5) + # Подключение к устройству def connect_device(self): if self.connection: messagebox.showinfo("Информация", "Уже подключено.") @@ -815,6 +852,7 @@ class SerialAppGUI(tk.Tk): else: self.append_interactive_text("[ERROR] Не удалось установить соединение.\n") + # Отключение от устройства def disconnect_device(self): if self.connection: try: @@ -826,6 +864,7 @@ class SerialAppGUI(tk.Tk): else: messagebox.showinfo("Информация", "Соединение не установлено.") + # Отправка команды def send_command(self): if not self.connection: messagebox.showerror("Ошибка", "Сначала установите соединение!") @@ -836,42 +875,19 @@ class SerialAppGUI(tk.Tk): self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n") threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start() + # Обработка команды def process_command(self, cmd): try: - max_attempts = 3 - attempt = 0 - while attempt < max_attempts: - self.connection.write((cmd + "\n").encode()) - logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") - response = read_response( - self.connection, - self.settings.get("timeout", 10), - login=self.settings.get("login"), - password=self.settings.get("password"), - is_gui=True, - ) - if response: - if '^' in response: - self.append_interactive_text( - f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" - f"Ответ устройства:\n{response}\n" - f"Повторная отправка команды...\n" - ) - logging.warning(f"Ошибка в команде: {cmd}. Попытка повторной отправки.") - attempt += 1 - time.sleep(1) - continue - else: - self.append_interactive_text(f"[INFO] Ответ устройства:\n{response}\n") - logging.info(f"Получен ответ:\n{response}") - break - else: - self.append_interactive_text("[WARN] Ответ не получен.\n") - logging.warning("Нет ответа от устройства в течение таймаута.") - break - if attempt == max_attempts: - self.append_interactive_text(f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n") - logging.error(f"Команда не выполнена корректно после {max_attempts} попыток: {cmd}") + success, response = send_command_and_process_response( + self.connection, + cmd, + self.settings.get("timeout", 10), + max_attempts=3, + log_callback=self.append_interactive_text, + login=self.settings.get("login"), + password=self.settings.get("password"), + is_gui=True + ) except SerialException as e: self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n") logging.error(f"Ошибка отправки команды: {e}", exc_info=True) @@ -879,11 +895,11 @@ class SerialAppGUI(tk.Tk): self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n") logging.error(f"Неизвестная ошибка: {e}", exc_info=True) + # Добавление текста в текстовое поле def append_interactive_text(self, text): - self.interactive_text.insert(END, text) - self.interactive_text.see(END) + append_text_to_widget(self.interactive_text, text) - # -------------- Вкладка "Выполнить команды из файла" -------------- + # Создание вкладки "Выполнить команды из файла" def create_file_exec_tab(self, frame): file_frame = ttk.Frame(frame) file_frame.pack(fill=X, pady=5) @@ -895,11 +911,11 @@ class SerialAppGUI(tk.Tk): self.file_exec_text = CustomText(frame, wrap="word", height=15) self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5) + # Выбор файла конфигурации для выполнения команд def select_config_file_fileexec(self): - filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")]) - if filename: - self.file_exec_var.set(filename) + select_config_file(self, self.file_exec_var) + # Выполнение команд из файла def execute_file_commands(self): if not self.settings.get("port"): messagebox.showerror("Ошибка", "COM-порт не выбран!") @@ -929,10 +945,9 @@ class SerialAppGUI(tk.Tk): ).start() def append_file_exec_text(self, text): - self.file_exec_text.insert(END, text) - self.file_exec_text.see(END) + append_text_to_widget(self.file_exec_text, text) - # -------------- Вкладка "Редактор конфигурационного файла" -------------- + # Создание вкладки "Редактор конфигурационного файла" def create_config_editor_tab(self, frame): top_frame = ttk.Frame(frame) top_frame.pack(fill=X, pady=5) @@ -945,13 +960,11 @@ class SerialAppGUI(tk.Tk): self.config_editor_text = CustomText(frame, wrap="word") self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5) + # Выбор файла конфигурации для редактирования def select_config_file_editor(self): - filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")]) - if filename: - self.editor_file_var.set(filename) - self.settings["config_file"] = filename - settings_save(self.settings) + select_config_file(self, self.editor_file_var, save_to_settings=True) + # Загрузка файла конфигурации def load_config_file(self): filename = self.editor_file_var.get() if not filename or not os.path.exists(filename): @@ -967,6 +980,7 @@ class SerialAppGUI(tk.Tk): logging.error(f"Ошибка загрузки файла: {e}", exc_info=True) messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}") + # Сохранение файла конфигурации def save_config_file(self): filename = self.editor_file_var.get() if not filename: @@ -981,13 +995,14 @@ class SerialAppGUI(tk.Tk): logging.error(f"Ошибка сохранения файла: {e}", exc_info=True) messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}") + # Открытие окна "О программе" def open_about(self): about_window = AboutWindow(self) about_window.transient(self) about_window.grab_set() + # Создание вкладки TFTP сервера def create_tftp_tab(self, frame): - """Создание вкладки TFTP сервера.""" # Создаем фрейм для управления TFTP сервером tftp_frame = ttk.Frame(frame) tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) @@ -1075,8 +1090,8 @@ class SerialAppGUI(tk.Tk): self.tftp_server = None self.tftp_server_thread = None + # Запуск TFTP сервера def start_tftp_server(self): - """Запуск TFTP сервера.""" try: # Получаем выбранный IP-адрес ip = self.tftp_ip_var.get() @@ -1138,15 +1153,15 @@ class SerialAppGUI(tk.Tk): self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}") messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}") + # Запуск TFTP сервера в отдельном потоке def run_tftp_server(self, ip, port): - """Запуск TFTP сервера в отдельном потоке.""" try: self.tftp_server.start_server(ip, port) except Exception as e: self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}") + # Остановка TFTP сервера def stop_tftp_server(self): - """Остановка TFTP сервера.""" if self.tftp_server: try: # Отключаем кнопки на время остановки сервера @@ -1190,13 +1205,11 @@ class SerialAppGUI(tk.Tk): self.start_tftp_button.config(state="disabled") self.stop_tftp_button.config(state="normal") - def append_tftp_log(self, message): - """Добавление сообщения в лог TFTP сервера.""" - self.tftp_log_text.insert(END, message + "\n") - self.tftp_log_text.see(END) + def append_tftp_log(self, text): + append_text_to_widget(self.tftp_log_text, text) + # Обновление информации об активных передачах def update_transfers_info(self): - """Обновление информации об активных передачах.""" if not self.tftp_server: return @@ -1237,15 +1250,15 @@ class SerialAppGUI(tk.Tk): f"{elapsed_time:.1f}с" )) + # Периодическое обновление информации о передачах def update_transfers_periodically(self): - """Периодическое обновление информации о передачах.""" if self.tftp_server and self.tftp_server.running: self.update_transfers_info() # Планируем следующее обновление через 1 секунду self.after(1000, self.update_transfers_periodically) + # Обновление списка сетевых адаптеров def update_network_adapters(self): - """Обновление списка сетевых адаптеров.""" adapters = get_network_adapters() self.tftp_ip_combo["values"] = adapters if not self.tftp_ip_var.get() in adapters: diff --git a/about_window.py b/about_window.py index 1d3240e..911ced4 100644 --- a/about_window.py +++ b/about_window.py @@ -69,7 +69,7 @@ class AboutWindow(tk.Toplevel): ttk.Label( contact_frame, - text="Email: LowaWorkMail@gmail.com" + text="Email: SPRF555@gmail.com" ).pack(anchor="w") telegram_link = ttk.Label( diff --git a/update_checker.py b/update_checker.py index 137bf09..c1c89a5 100644 --- a/update_checker.py +++ b/update_checker.py @@ -1,175 +1,165 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import json import logging import requests import threading +import re from packaging import version +import xml.etree.ElementTree as ET +import html class UpdateCheckError(Exception): """Исключение для ошибок проверки обновлений""" pass +class ReleaseType: + """Типы релизов""" + STABLE = "stable" + PRERELEASE = "prerelease" + class UpdateChecker: """Класс для проверки обновлений программы""" - def __init__(self, current_version, repo_url): + def __init__(self, current_version, repo_url, include_prereleases=False): self.current_version = current_version self.repo_url = repo_url - # Формируем базовый URL API - self.api_url = repo_url.replace("gitea.filow.ru", "gitea.filow.ru/api/v1/repos/LowaSC/ComConfigCopy") - self._update_available = False - self._latest_version = None - self._latest_release = None - self._error = None - self._changelog = None - - def get_changelog(self, callback=None): - """ - Получение changelog из репозитория. - :param callback: Функция обратного вызова, которая будет вызвана после получения changelog - """ - def fetch(): - try: - # Пытаемся получить CHANGELOG.md из репозитория - response = requests.get(f"{self.api_url}/contents/CHANGELOG.md", timeout=10) - response.raise_for_status() - - content = response.json() - if "content" in content: - import base64 - changelog_content = base64.b64decode(content["content"]).decode("utf-8") - self._changelog = changelog_content - self._error = None - else: - raise UpdateCheckError("Не удалось получить содержимое CHANGELOG.md") - - except requests.RequestException as e: - error_msg = f"Ошибка получения changelog: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - self._changelog = None - except Exception as e: - error_msg = f"Неизвестная ошибка при получении changelog: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - self._changelog = None - finally: - if callback: - callback(self._changelog, self._error) + self.include_prereleases = include_prereleases + self.rss_url = f"{repo_url}/releases.rss" + self.release_info = None + + def _clean_html(self, html_text): + """Очищает HTML-разметку и форматирует текст""" + if not html_text: + return "" + text = re.sub(r'<[^>]+>', '', html_text) + text = html.unescape(text) + text = re.sub(r'\n\s*\n', '\n\n', text) + return '\n'.join(line.strip() for line in text.splitlines()).strip() + + def _parse_release_info(self, item): + """Извлекает информацию о релизе из RSS item""" + title = item.find('title').text if item.find('title') is not None else '' + link = item.find('link').text if item.find('link') is not None else '' + description = item.find('description').text if item.find('description') is not None else '' + content = item.find('{http://purl.org/rss/1.0/modules/content/}encoded') + content_text = content.text if content is not None else '' - # Запускаем получение в отдельном потоке - threading.Thread(target=fetch, daemon=True).start() - + # Извлекаем версию и проверяем тип релиза из тега + version_match = re.search(r'/releases/tag/(?:pre-)?v?(\d+\.\d+(?:\.\d+)?)', link) + if not version_match: + return None + + version_str = version_match.group(1) + # Проверяем наличие префикса pre- в теге + is_prerelease = 'pre-' in link.lower() + + # Форматируем название релиза + formatted_title = title + if title == version_str or not title.strip(): + # Если заголовок пустой или совпадает с версией, создаем стандартное название + release_type = "Пре-релиз" if is_prerelease else "Версия" + formatted_title = f"{release_type} {version_str}" + elif not re.search(version_str, title): + # Если версия не указана в заголовке, добавляем её + formatted_title = f"{title} ({version_str})" + + # Форматируем описание + formatted_description = self._clean_html(content_text or description) + if not formatted_description.strip(): + formatted_description = "Нет описания" + + # Добавляем метку типа релиза в начало описания + release_type_label = "[Пре-релиз] " if is_prerelease else "" + formatted_description = f"{release_type_label}{formatted_description}" + + return { + 'title': formatted_title, + 'link': link, + 'description': formatted_description, + 'version': version_str, + 'type': ReleaseType.PRERELEASE if is_prerelease else ReleaseType.STABLE + } + def check_updates(self, callback=None): - """ - Проверка наличия обновлений. - :param callback: Функция обратного вызова, которая будет вызвана после проверки - """ - def check(): + """Проверяет наличие обновлений в асинхронном режиме""" + def check_worker(): try: - response = requests.get(f"{self.api_url}/releases", timeout=10) + logging.info(f"Текущая версия программы: {self.current_version}") + logging.info(f"Проверка пре-релизов: {self.include_prereleases}") + logging.info(f"Запрос RSS ленты: {self.rss_url}") + + response = requests.get(self.rss_url, timeout=10) response.raise_for_status() - releases = response.json() - if not releases: - raise UpdateCheckError("Не найдено релизов в репозитории") + root = ET.fromstring(response.content) + items = root.findall('.//item') + if not items: + raise UpdateCheckError("Релизы не найдены") - latest_release = releases[0] - latest_version = latest_release.get("tag_name", "").lstrip("v") + logging.info(f"Найдено {len(items)} релизов") - if not latest_version: - raise UpdateCheckError("Не удалось определить версию последнего релиза") + latest_version = None + latest_info = None - try: - if version.parse(latest_version) > version.parse(self.current_version): - self._update_available = True - self._latest_version = latest_version - self._latest_release = latest_release - logging.info(f"Доступно обновление: {latest_version}") - else: - logging.info("Обновления не требуются") - except version.InvalidVersion as e: - raise UpdateCheckError(f"Некорректный формат версии: {e}") + for item in items: + release_info = self._parse_release_info(item) + if not release_info: + continue + + is_prerelease = release_info['type'] == ReleaseType.PRERELEASE + logging.info( + f"Проверка релиза: {release_info['title']}, " + f"версия: {release_info['version']}, " + f"тип: {'пре-релиз' if is_prerelease else 'стабильная'}" + ) - self._error = None + # Пропускаем пре-релизы если они не включены + if is_prerelease and not self.include_prereleases: + logging.info(f"Пропуск пре-релиза: {release_info['version']}") + continue - except requests.RequestException as e: - error_msg = f"Ошибка сетевого подключения: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - except UpdateCheckError as e: - logging.error(str(e), exc_info=True) - self._error = str(e) - except Exception as e: - error_msg = f"Неизвестная ошибка при проверке обновлений: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - finally: + # Сравниваем версии + try: + current_ver = version.parse(latest_version or "0.0.0") + new_ver = version.parse(release_info['version'].split('-')[0]) # Убираем суффикс для сравнения + + if new_ver > current_ver: + latest_version = release_info['version'] + latest_info = release_info + logging.info(f"Новая версия: {latest_version}") + + except version.InvalidVersion as e: + logging.warning(f"Некорректный формат версии {release_info['version']}: {e}") + continue + + if not latest_info: + raise UpdateCheckError("Не найдены подходящие версии") + + self.release_info = latest_info + + # Сравниваем с текущей версией + current_ver = version.parse(self.current_version) + latest_ver = version.parse(latest_version.split('-')[0]) + update_available = latest_ver > current_ver + + logging.info(f"Сравнение версий: текущая {current_ver} <-> последняя {latest_ver}") + logging.info(f"Доступно обновление: {update_available}") + if callback: - callback(self._update_available, self._error) - - @property - def update_available(self): - """Доступно ли обновление""" - return self._update_available - - @property - def latest_version(self): - """Последняя доступная версия""" - return self._latest_version - - @property - def error(self): - """Последняя ошибка при проверке обновлений""" - return self._error - - @property - def changelog(self): - """Текущий changelog""" - return self._changelog + callback(update_available, None) + + except UpdateCheckError as e: + logging.error(str(e)) + if callback: + callback(False, str(e)) + except Exception as e: + logging.error(f"Ошибка при проверке обновлений: {e}", exc_info=True) + if callback: + callback(False, str(e)) + + threading.Thread(target=check_worker, daemon=True).start() def get_release_notes(self): - """Получение информации о последнем релизе""" - if self._latest_release: - return { - "version": self._latest_version, - "description": self._latest_release.get("body", ""), - "download_url": self._latest_release.get("assets", [{}])[0].get("browser_download_url", "") - } - return None - - def get_releases(self, callback=None): - """ - Получение списка релизов из репозитория. - :param callback: Функция обратного вызова, которая будет вызвана после получения списка релизов - """ - def fetch(): - try: - response = requests.get(f"{self.api_url}/releases", timeout=10) - response.raise_for_status() - releases = response.json() - - if not releases: - raise UpdateCheckError("Не найдено релизов в репозитории") - - self._error = None - if callback: - callback(releases, None) - - except requests.RequestException as e: - error_msg = f"Ошибка сетевого подключения: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - if callback: - callback(None, error_msg) - except Exception as e: - error_msg = f"Ошибка при получении списка релизов: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - if callback: - callback(None, error_msg) - - # Запускаем получение в отдельном потоке - threading.Thread(target=fetch, daemon=True).start() \ No newline at end of file + """Возвращает информацию о последнем релизе""" + return self.release_info \ No newline at end of file