17 Commits

Author SHA1 Message Date
f5935d6b8f Merge pull request 'v1.0.1' (#4) from v1.0.1 into main
Reviewed-on: #4
2025-02-16 22:02:20 +00:00
1a511ff54f Enhance update checking
- Refactor UpdateChecker
- Add support for parsing release details with improved formatting
- Implement more robust version comparison and release type handling
- Add logging for update checking process
- Improve error handling and release information extraction
- Update update checking logic to handle stable and pre-release versions
2025-02-17 00:53:24 +03:00
f84e20631b Update contact email in README.md
- Change contact email from LowaWorkMail@gmail.com to SPRF555@gmail.com
2025-02-17 00:11:10 +03:00
4a67e70a92 Обновить README.md 2025-02-16 20:52:02 +00:00
12562e615f Bump version to 1.0.1
- Update application version number in ComConfigCopy.py
- Minor version increment to reflect recent changes
2025-02-16 23:48:59 +03:00
7ebeb52808 Refactor and optimize code structure with base widget and utility functions
- Create CustomWidgetBase for shared context menu and shortcut functionality
- Add utility functions for common tasks like text appending and file selection
- Simplify command processing with a generic send_command_and_process_response function
- Remove redundant comments and unused imports
- Improve code organization and readability
- Enhance modularity of custom widgets and utility methods
2025-02-16 23:45:19 +03:00
a140b7d8a0 Clean up .gitignore file
- Remove specific config and test files from tracking
- Add Configs/ directory to ignore list
- Remove unnecessary icon and test script entries
2025-02-16 05:36:53 +03:00
2c9edcd859 Remove outdated ComConfigCopy executable binary 2025-02-16 05:35:35 +03:00
5a00efd175 Update executable binary after feature removal 2025-02-16 05:08:44 +03:00
2f4b2985cd Remove documentation and CLI mode features
- Remove "Documentation" menu option
- Delete unused CLI mode code
- Clean up commented-out argument parsing function
2025-02-16 05:00:43 +03:00
d1a870fed7 Add update checking and documentation features
- Implement UpdateChecker for version comparison and update notifications
- Add menu options for documentation and update checking
- Enhance AboutWindow with dynamic version display
- Update requirements.txt with new dependencies
- Create infrastructure for opening local documentation
- Improve application menu with additional help options
2025-02-16 04:50:33 +03:00
2e2dd9e705 Add custom text and entry widgets with enhanced copy/paste functionality
- Implement CustomText and CustomEntry classes with advanced text interaction features
- Add context menu for text widgets with cut, copy, paste, and select all options
- Support multiple keyboard shortcuts for text manipulation
- Replace standard Tkinter Text and Entry widgets with custom implementations
- Remove global text/entry widget bindings in favor of class-specific methods
2025-02-16 03:57:48 +03:00
d937042ea2 Update application title to reflect project name
- Change window title from "Serial Device Manager" to "ComConfigCopy"
2025-02-16 03:53:21 +03:00
136c7877d3 Refactor TFTP file transfer with improved reliability and error handling
- Implement more robust file transfer mechanism with configurable retry and timeout settings
- Add detailed logging for transfer progress and error scenarios
- Enhance block transfer logic with better error recovery
- Simplify transfer socket management and cleanup process
- Improve overall transfer reliability and error tracking
2025-02-16 03:50:27 +03:00
467d582095 Improve file transfer progress tracking and display
- Add dynamic transfer speed calculation
- Compute and display estimated remaining transfer time
- Enhance remaining bytes display with more informative status
- Update transfers table with more detailed transfer progress information
2025-02-16 03:43:34 +03:00
16526b4643 Merge pull request 'TFTP' (#3) from TFTP into main
Reviewed-on: #3
2025-02-16 00:39:43 +00:00
b8bae39a17 Update .gitignore to include .venv/ directory 2025-02-16 03:37:28 +03:00
9 changed files with 634 additions and 351 deletions

5
.gitignore vendored
View File

@@ -1,9 +1,6 @@
app.log
Settings/settings.json
Configs/Eltex MES2424 AC - Сеть FTTB 2G, доп.txt
Configs/конфиг доп 3750-52 с айпи 172.17.141.133 .txt
DALL·E 2024-12-29 01.01.02 - Square vector logo_ A clean and minimalistic app icon for serial port management software. The design prominently features a simplified rectangular CO.ico
test.py
Configs/
__pycache__/
Firmware/1.jpg
Firmware/2

View File

@@ -5,11 +5,6 @@
# Это программа для копирования конфигураций на коммутаторы
# ------------------------------------------------------------
# import argparse Использовался для получения аргументов из командной строки (не используется)
# import platform Использовался для получения списка сетевых адаптеров (не используется)
# import subprocess Использовался для получения списка сетевых адаптеров (не используется)
# import socket не используется
import json
import logging
import os
@@ -17,6 +12,7 @@ import re
import sys
import threading
import time
import webbrowser
from getpass import getpass
from logging.handlers import RotatingFileHandler
import tkinter as tk
@@ -38,24 +34,24 @@ import serial.tools.list_ports
from serial.serialutil import SerialException
from about_window import AboutWindow
from TFTPServer import TFTPServer
# from TFTPServer import TFTPServerThread
import socket
from update_checker import UpdateChecker
# Версия программы
VERSION = "1.0.1"
# Создаем необходимые папки
os.makedirs("Logs", exist_ok=True)
os.makedirs("Configs", exist_ok=True)
os.makedirs("Settings", exist_ok=True)
os.makedirs("Firmware", exist_ok=True)
os.makedirs("docs", exist_ok=True)
# Файл настроек находится в папке Settings
SETTINGS_FILE = os.path.join("Settings", "settings.json")
# ==========================
# Функции работы с настройками и логированием
# ==========================
# Настройка логирования с использованием RotatingFileHandler.
def setup_logging():
"""Настройка логирования с использованием RotatingFileHandler."""
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
log_path = os.path.join("Logs", "app.log")
@@ -66,8 +62,8 @@ def setup_logging():
handler.setFormatter(formatter)
logger.addHandler(handler)
# Загрузка настроек из JSON-файла или создание настроек по умолчанию.
def settings_load():
"""Загрузка настроек из JSON-файла или создание настроек по умолчанию."""
default_settings = {
"port": None, # Порт для подключения
"baudrate": 9600, # Скорость передачи данных
@@ -120,8 +116,8 @@ def settings_load():
logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True)
return default_settings
# Сохранение настроек в JSON-файл
def settings_save(settings):
"""Сохранение настроек в JSON-файл."""
try:
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(settings, f, indent=4, ensure_ascii=False)
@@ -129,18 +125,15 @@ def settings_save(settings):
except Exception as e:
logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True)
# Получение списка доступных последовательных портов
def list_serial_ports():
"""Получение списка доступных последовательных портов."""
ports = serial.tools.list_ports.comports()
logging.debug(f"Найдено {len(ports)} серийных портов.")
return [port.device for port in ports]
# ==========================
# Функции работы с сетевыми адаптерами (не используются)
# ==========================
# Получение списка IP-адресов из сетевых адаптеров
def get_network_adapters():
"""Получение списка сетевых адаптеров и их IP-адресов."""
adapters = []
try:
# Получаем имя хоста
@@ -171,12 +164,8 @@ def get_network_adapters():
return adapters
# ==========================
# Функции работы с COM-соединением
# ==========================
# Создание соединения с устройством через последовательный порт
def create_connection(settings):
"""Создание соединения с устройством через последовательный порт."""
try:
conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1)
logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.")
@@ -188,12 +177,8 @@ def create_connection(settings):
logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True)
return None
# Проверка наличия логина и пароля в настройках и отправка их на устройство
def send_login_password(serial_connection, login=None, password=None, is_gui=False):
"""Отправка логина и пароля на устройство."""
if not login:
if is_gui:
login = simpledialog.askstring("Login", "Введите логин:")
@@ -210,14 +195,8 @@ def send_login_password(serial_connection, login=None, password=None, is_gui=Fal
else:
password = getpass("Введите пароль: ")
# Чтение ответа от устройства с учётом таймаута.
def read_response(serial_connection, timeout, login=None, password=None, is_gui=False):
"""
Чтение ответа от устройства с учётом таймаута.
Если в последней строке появляется запрос логина или пароля, данные отправляются автоматически.
"""
response = b""
end_time = time.time() + timeout
decoded = ""
@@ -254,8 +233,8 @@ def read_response(serial_connection, timeout, login=None, password=None, is_gui=
time.sleep(0.1)
return decoded
# Генерация блоков команд для блочного копирования
def generate_command_blocks(lines, block_size):
"""Генерация блоков команд для блочного копирования."""
blocks = []
current_block = []
for line in lines:
@@ -281,6 +260,7 @@ def generate_command_blocks(lines, block_size):
blocks.append("\n".join(current_block))
return blocks
# Выполнение команд из файла конфигурации
def execute_commands_from_file(
serial_connection,
filename,
@@ -292,12 +272,6 @@ def execute_commands_from_file(
password=None,
is_gui=False,
):
"""
Выполнение команд из файла конфигурации.
Если передан log_callback, вывод будет отображаться в GUI.
Теперь при обнаружении ошибки (например, если в ответе присутствует символ '^')
команда будет отправляться повторно.
"""
try:
with open(filename, "r", encoding="utf-8") as file:
lines = [line for line in file if line.strip()]
@@ -436,10 +410,81 @@ def execute_commands_from_file(
log_callback(msg)
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
# ==========================
# Графический интерфейс (Tkinter)
# ==========================
# Базовый класс для кастомных виджетов с поддержкой контекстного меню и горячих клавиш
class CustomWidgetBase:
def __init__(self):
self.create_context_menu()
self.bind_shortcuts()
# Создание контекстного меню
def create_context_menu(self):
"""Создание контекстного меню"""
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
# Привязка горячих клавиш
def bind_shortcuts(self):
"""Привязка горячих клавиш"""
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Дополнительные сочетания
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
# Отображение контекстного меню
def show_context_menu(self, event):
"""Отображение контекстного меню"""
self.context_menu.post(event.x_root, event.y_root)
# Вырезание текста
def cut(self):
self.event_generate("<<Cut>>")
# Копирование текста
def copy(self):
self.event_generate("<<Copy>>")
# Вставка текста
def paste(self):
self.event_generate("<<Paste>>")
# Класс для текстового поля с поддержкой контекстного меню и горячих клавиш
class CustomText(tk.Text, CustomWidgetBase):
def __init__(self, *args, **kwargs):
tk.Text.__init__(self, *args, **kwargs)
CustomWidgetBase.__init__(self)
def select_all(self, event=None):
"""Выделение всего текста в Text виджете"""
self.tag_add(tk.SEL, "1.0", tk.END)
self.mark_set(tk.INSERT, "1.0")
self.see(tk.INSERT)
return "break"
# Класс для поля ввода с поддержкой контекстного меню и горячих клавиш
class CustomEntry(ttk.Entry, CustomWidgetBase):
def __init__(self, *args, **kwargs):
ttk.Entry.__init__(self, *args, **kwargs)
CustomWidgetBase.__init__(self)
def select_all(self, event=None):
"""Выделение всего текста в Entry виджете"""
self.select_range(0, tk.END)
return "break"
# Класс для окна настроек
class SettingsWindow(tk.Toplevel):
def __init__(self, parent, settings, callback=None):
super().__init__(parent)
@@ -479,20 +524,21 @@ class SettingsWindow(tk.Toplevel):
copy_mode_frame = ttk.Frame(settings_frame)
copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5)
ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line",
variable=self.copy_mode_var).pack(side=LEFT)
variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block",
variable=self.copy_mode_var).pack(side=LEFT)
variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
# Размер блока
ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5)
self.block_size_label = ttk.Label(settings_frame, text="Размер блока:")
self.block_size_label.grid(row=4, column=0, sticky=W, pady=5)
self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
block_size_entry = ttk.Entry(settings_frame, textvariable=self.block_size_var)
block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
self.block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var)
self.block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
# Приглашение командной строки
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
self.prompt_var = StringVar(value=settings.get("prompt", ">"))
prompt_entry = ttk.Entry(settings_frame, textvariable=self.prompt_var)
prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var)
prompt_entry.grid(row=5, column=1, sticky=W, pady=5)
# Кнопки
@@ -503,9 +549,22 @@ class SettingsWindow(tk.Toplevel):
self.update_ports()
# Инициализация видимости поля размера блока
self.toggle_block_size()
# Центрируем окно
self.center_window()
# Переключение видимости поля размера блока
def toggle_block_size(self):
if self.copy_mode_var.get() == "line":
self.block_size_label.grid_remove()
self.block_size_entry.grid_remove()
else:
self.block_size_label.grid()
self.block_size_entry.grid()
# Центрирование окна
def center_window(self):
self.update_idletasks()
width = self.winfo_width()
@@ -514,12 +573,14 @@ class SettingsWindow(tk.Toplevel):
y = (self.winfo_screenheight() // 2) - (height // 2)
self.geometry(f"{width}x{height}+{x}+{y}")
# Обновление списка доступных последовательных портов
def update_ports(self):
ports = list_serial_ports()
self.port_combo["values"] = ports
if ports and not self.port_var.get():
self.port_var.set(ports[0])
# Сохранение настроек
def save_settings(self):
try:
self.settings.update({
@@ -538,35 +599,179 @@ class SettingsWindow(tk.Toplevel):
except ValueError as e:
messagebox.showerror("Ошибка", "Проверьте правильность введенных значений")
# Общая функция для добавления текста в текстовое поле
def append_text_to_widget(widget, text):
# Проверяем, заканчивается ли текст символом новой строки
if not text.endswith('\n'):
text += '\n'
widget.insert(END, text)
widget.see(END)
# Общая функция для выбора файла конфигурации
def select_config_file(self, var, save_to_settings=False):
filename = filedialog.askopenfilename(
title="Выберите файл конфигурации",
filetypes=[("Text files", "*.txt")]
)
if filename:
var.set(filename)
if save_to_settings:
self.settings["config_file"] = filename
settings_save(self.settings)
# Общая функция для отправки команды и обработки ответа
def send_command_and_process_response(
serial_connection,
cmd,
timeout,
max_attempts=3,
log_callback=None,
login=None,
password=None,
is_gui=False
):
attempt = 0
while attempt < max_attempts:
msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
if log_callback:
log_callback(msg)
serial_connection.write((cmd + "\n").encode())
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
if response:
if '^' in response:
msg = (
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{response}\n"
f"Повторная отправка команды...\n"
)
if log_callback:
log_callback(msg)
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
attempt += 1
time.sleep(1)
continue
else:
msg = f"Ответ устройства:\n{response}\n"
if log_callback:
log_callback(msg)
logging.info(f"Ответ устройства:\n{response}")
return True, response
else:
msg = f"Ответ не получен для команды: {cmd}\n"
if log_callback:
log_callback(msg)
logging.warning(f"Нет ответа для команды: {cmd}")
return False, None
msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
if log_callback:
log_callback(msg)
logging.error(msg)
return False, None
# Основной класс для графического интерфейса
class SerialAppGUI(tk.Tk):
def __init__(self, settings):
super().__init__()
self.title("Serial Device Manager")
self.title("ComConfigCopy")
self.geometry("900x700")
# Добавляем VERSION как атрибут класса
self.VERSION = VERSION
# Инициализация проверки обновлений
self.update_checker = UpdateChecker(
VERSION,
"https://gitea.filow.ru/LowaSC/ComConfigCopy",
include_prereleases=False # Проверяем только стабильные версии
)
# Настройка стиля
self.style = ttk.Style(self)
self.style.theme_use("clam")
default_font = ("Segoe UI", 10)
self.option_add("*Font", default_font)
self.settings = settings
self.connection = None
self.tftp_server = None
# Глобальные биндинги
self.bind_class("Text", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Text", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Text", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.bind_class("Entry", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Entry", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Entry", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.create_menu()
self.create_tabs()
# Проверка первого запуска
self.check_first_run()
# Создание меню
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="Проверить обновления", command=self.check_for_updates)
help_menu.add_separator()
help_menu.add_command(label="О программе", command=self.open_about)
# Проверка наличия обновлений
def check_for_updates(self):
def on_update_check(update_available, error):
if error:
messagebox.showerror(
"Ошибка проверки обновлений",
f"Не удалось проверить наличие обновлений:\n{error}"
)
elif update_available:
release_info = self.update_checker.get_release_notes()
if release_info:
# Форматируем сообщение
message = (
f"Доступна новая версия {release_info['version']}!\n\n"
f"Тип релиза: {'Пре-релиз' if release_info['type'] == 'prerelease' else 'Стабильный'}\n\n"
"Изменения:\n"
f"{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?"
)
response = messagebox.askyesno(
"Доступно обновление",
message,
)
if response:
webbrowser.open(release_info["link"])
else:
messagebox.showerror(
"Ошибка",
"Не удалось получить информацию о новой версии"
)
else:
messagebox.showinfo(
"Проверка обновлений",
"У вас установлена последняя версия программы"
)
self.update_checker.check_updates(callback=on_update_check)
# Обработчик изменения настроек
def on_settings_changed(self):
self.settings = settings_load()
# Открытие окна настроек
def open_settings(self):
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
# Проверка первого запуска программы
def check_first_run(self):
"""Проверка первого запуска программы"""
show_settings = False
# Проверяем существование файла настроек
@@ -595,31 +800,7 @@ class SerialAppGUI(tk.Tk):
if response:
self.open_settings()
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="О программе", command=self.open_about)
def open_settings(self):
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
def on_settings_changed(self):
# Обновляем настройки в основном приложении
self.settings = settings_load()
# Создание вкладок
def create_tabs(self):
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
@@ -640,23 +821,24 @@ class SerialAppGUI(tk.Tk):
self.create_config_editor_tab(config_editor_frame)
self.create_tftp_tab(tftp_frame)
# -------------- Вкладка "Интерактивный режим" --------------
# Создание вкладки "Интерактивный режим"
def create_interactive_tab(self, frame):
control_frame = ttk.Frame(frame)
control_frame.pack(fill=X, pady=5)
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
self.interactive_text = tk.Text(frame, wrap="word", height=20)
self.interactive_text = CustomText(frame, wrap="word", height=20)
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
input_frame = ttk.Frame(frame)
input_frame.pack(fill=X, pady=5)
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
self.command_entry = ttk.Entry(input_frame, width=50)
self.command_entry = CustomEntry(input_frame, width=50)
self.command_entry.pack(side=LEFT, padx=5)
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
# Подключение к устройству
def connect_device(self):
if self.connection:
messagebox.showinfo("Информация", "Уже подключено.")
@@ -670,6 +852,7 @@ class SerialAppGUI(tk.Tk):
else:
self.append_interactive_text("[ERROR] Не удалось установить соединение.\n")
# Отключение от устройства
def disconnect_device(self):
if self.connection:
try:
@@ -681,6 +864,7 @@ class SerialAppGUI(tk.Tk):
else:
messagebox.showinfo("Информация", "Соединение не установлено.")
# Отправка команды
def send_command(self):
if not self.connection:
messagebox.showerror("Ошибка", "Сначала установите соединение!")
@@ -691,42 +875,19 @@ class SerialAppGUI(tk.Tk):
self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n")
threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start()
# Обработка команды
def process_command(self, cmd):
try:
max_attempts = 3
attempt = 0
while attempt < max_attempts:
self.connection.write((cmd + "\n").encode())
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
response = read_response(
success, response = send_command_and_process_response(
self.connection,
cmd,
self.settings.get("timeout", 10),
max_attempts=3,
log_callback=self.append_interactive_text,
login=self.settings.get("login"),
password=self.settings.get("password"),
is_gui=True,
is_gui=True
)
if response:
if '^' in response:
self.append_interactive_text(
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{response}\n"
f"Повторная отправка команды...\n"
)
logging.warning(f"Ошибка в команде: {cmd}. Попытка повторной отправки.")
attempt += 1
time.sleep(1)
continue
else:
self.append_interactive_text(f"[INFO] Ответ устройства:\n{response}\n")
logging.info(f"Получен ответ:\n{response}")
break
else:
self.append_interactive_text("[WARN] Ответ не получен.\n")
logging.warning("Нет ответа от устройства в течение таймаута.")
break
if attempt == max_attempts:
self.append_interactive_text(f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n")
logging.error(f"Команда не выполнена корректно после {max_attempts} попыток: {cmd}")
except SerialException as e:
self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n")
logging.error(f"Ошибка отправки команды: {e}", exc_info=True)
@@ -734,27 +895,27 @@ class SerialAppGUI(tk.Tk):
self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n")
logging.error(f"Неизвестная ошибка: {e}", exc_info=True)
# Добавление текста в текстовое поле
def append_interactive_text(self, text):
self.interactive_text.insert(END, text)
self.interactive_text.see(END)
append_text_to_widget(self.interactive_text, text)
# -------------- Вкладка "Выполнить команды из файла" --------------
# Создание вкладки "Выполнить команды из файла"
def create_file_exec_tab(self, frame):
file_frame = ttk.Frame(frame)
file_frame.pack(fill=X, pady=5)
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
self.file_exec_text = tk.Text(frame, wrap="word", height=15)
self.file_exec_text = CustomText(frame, wrap="word", height=15)
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Выбор файла конфигурации для выполнения команд
def select_config_file_fileexec(self):
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
if filename:
self.file_exec_var.set(filename)
select_config_file(self, self.file_exec_var)
# Выполнение команд из файла
def execute_file_commands(self):
if not self.settings.get("port"):
messagebox.showerror("Ошибка", "COM-порт не выбран!")
@@ -784,29 +945,26 @@ class SerialAppGUI(tk.Tk):
).start()
def append_file_exec_text(self, text):
self.file_exec_text.insert(END, text)
self.file_exec_text.see(END)
append_text_to_widget(self.file_exec_text, text)
# -------------- Вкладка "Редактор конфигурационного файла" --------------
# Создание вкладки "Редактор конфигурационного файла"
def create_config_editor_tab(self, frame):
top_frame = ttk.Frame(frame)
top_frame.pack(fill=X, pady=5)
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
self.config_editor_text = tk.Text(frame, wrap="word")
self.config_editor_text = CustomText(frame, wrap="word")
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Выбор файла конфигурации для редактирования
def select_config_file_editor(self):
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
if filename:
self.editor_file_var.set(filename)
self.settings["config_file"] = filename
settings_save(self.settings)
select_config_file(self, self.editor_file_var, save_to_settings=True)
# Загрузка файла конфигурации
def load_config_file(self):
filename = self.editor_file_var.get()
if not filename or not os.path.exists(filename):
@@ -822,6 +980,7 @@ class SerialAppGUI(tk.Tk):
logging.error(f"Ошибка загрузки файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}")
# Сохранение файла конфигурации
def save_config_file(self):
filename = self.editor_file_var.get()
if not filename:
@@ -836,13 +995,14 @@ class SerialAppGUI(tk.Tk):
logging.error(f"Ошибка сохранения файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}")
# Открытие окна "О программе"
def open_about(self):
about_window = AboutWindow(self)
about_window.transient(self)
about_window.grab_set()
# Создание вкладки TFTP сервера
def create_tftp_tab(self, frame):
"""Создание вкладки TFTP сервера."""
# Создаем фрейм для управления TFTP сервером
tftp_frame = ttk.Frame(frame)
tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
@@ -868,7 +1028,7 @@ class SerialAppGUI(tk.Tk):
port_frame.pack(fill=X, padx=5, pady=2)
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
self.tftp_port_var = StringVar(value="69")
self.tftp_port_entry = ttk.Entry(port_frame, textvariable=self.tftp_port_var)
self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var)
self.tftp_port_entry.pack(fill=X, expand=True, padx=5)
# Кнопки управления
@@ -894,7 +1054,7 @@ class SerialAppGUI(tk.Tk):
log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера")
log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
self.tftp_log_text = tk.Text(log_frame, wrap=tk.WORD, height=10)
self.tftp_log_text = CustomText(log_frame, wrap=tk.WORD, height=10)
self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Добавляем скроллбар для лога
@@ -930,8 +1090,8 @@ class SerialAppGUI(tk.Tk):
self.tftp_server = None
self.tftp_server_thread = None
# Запуск TFTP сервера
def start_tftp_server(self):
"""Запуск TFTP сервера."""
try:
# Получаем выбранный IP-адрес
ip = self.tftp_ip_var.get()
@@ -993,15 +1153,15 @@ class SerialAppGUI(tk.Tk):
self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}")
# Запуск TFTP сервера в отдельном потоке
def run_tftp_server(self, ip, port):
"""Запуск TFTP сервера в отдельном потоке."""
try:
self.tftp_server.start_server(ip, port)
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}")
# Остановка TFTP сервера
def stop_tftp_server(self):
"""Остановка TFTP сервера."""
if self.tftp_server:
try:
# Отключаем кнопки на время остановки сервера
@@ -1045,13 +1205,11 @@ class SerialAppGUI(tk.Tk):
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal")
def append_tftp_log(self, message):
"""Добавление сообщения в лог TFTP сервера."""
self.tftp_log_text.insert(END, message + "\n")
self.tftp_log_text.see(END)
def append_tftp_log(self, text):
append_text_to_widget(self.tftp_log_text, text)
# Обновление информации об активных передачах
def update_transfers_info(self):
"""Обновление информации об активных передачах."""
if not self.tftp_server:
return
@@ -1068,108 +1226,44 @@ class SerialAppGUI(tk.Tk):
# Вычисляем прогресс
progress = f"{bytes_sent}/{filesize} байт"
remaining = filesize - bytes_sent
remaining_bytes = filesize - bytes_sent
elapsed_time = time.time() - start_time
# Вычисляем скорость передачи (байт/сек)
if elapsed_time > 0:
transfer_speed = bytes_sent / elapsed_time
# Вычисляем оставшееся время
if transfer_speed > 0:
remaining_time = remaining_bytes / transfer_speed
remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)"
else:
remaining_str = f"{remaining_bytes} байт (неизвестно)"
else:
remaining_str = f"{remaining_bytes} байт (вычисляется...)"
# Добавляем запись в таблицу
self.transfers_tree.insert("", END, values=(
f"{client_addr[0]}:{client_addr[1]}",
filename,
progress,
f"{remaining} байт",
remaining_str,
f"{elapsed_time:.1f}с"
))
# Периодическое обновление информации о передачах
def update_transfers_periodically(self):
"""Периодическое обновление информации о передачах."""
if self.tftp_server and self.tftp_server.running:
self.update_transfers_info()
# Планируем следующее обновление через 1 секунду
self.after(1000, self.update_transfers_periodically)
# Обновление списка сетевых адаптеров
def update_network_adapters(self):
"""Обновление списка сетевых адаптеров."""
adapters = get_network_adapters()
self.tftp_ip_combo["values"] = adapters
if not self.tftp_ip_var.get() in adapters:
self.tftp_ip_var.set(adapters[0])
# ==========================
# Парсер аргументов (не используется)
# ==========================
# def parse_arguments():
# parser = argparse.ArgumentParser(
# description="Программа для работы с устройствами через последовательный порт с графическим интерфейсом."
# )
# parser.add_argument("--cli", action="store_true", help="Запустить в режиме командной строки (без графики)")
# return parser.parse_args()
# ==========================
# Режим командной строки (не используется)
# ==========================
# def run_cli_mode(settings):
# print("Запущен режим командной строки.")
# while True:
# print("\n--- Главное меню ---")
# print("1. Подключиться и войти в интерактивный режим")
# print("2. Выполнить команды из файла конфигурации")
# print("0. Выход")
# choice = input("Выберите действие: ").strip()
# if choice == "1":
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if not conn:
# print("[ERROR] Не удалось установить соединение.")
# continue
# print("[INFO] Введите команды (для выхода введите _exit):")
# while True:
# cmd = input("Команда: ")
# if cmd.strip().lower() == "_exit":
# break
# try:
# conn.write((cmd + "\n").encode())
# response = read_response(
# conn, settings.get("timeout", 10),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False
# )
# print(response)
# except Exception as e:
# print(f"[ERROR] {e}")
# break
# conn.close()
# elif choice == "2":
# if not settings.get("config_file"):
# print("[ERROR] Файл конфигурации не выбран!")
# continue
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if conn:
# execute_commands_from_file(
# conn,
# settings["config_file"],
# settings.get("timeout", 10),
# settings.get("copy_mode", "line"),
# settings.get("block_size", 15),
# lambda msg: print(msg),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False,
# )
# conn.close()
# else:
# print("[ERROR] Не удалось установить соединение.")
# elif choice == "0":
# break
# else:
# print("[ERROR] Некорректный выбор.")
# ==========================
# Основной запуск приложения
# ==========================

BIN
Icon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

44
README.md Normal file
View File

@@ -0,0 +1,44 @@
# ComConfigCopy
Программа для копирования конфигураций на коммутаторы.
## Описание
ComConfigCopy - это утилита, разработанная для автоматизации процесса копирования конфигураций на сетевые коммутаторы. Программа предоставляет удобный графический интерфейс для управления процессом копирования и настройки параметров подключения.
## Основные возможности
- Копирование конфигураций на коммутаторы через COM-порт
- Поддержка различных скоростей подключения
- Автоматическое определение доступных COM-портов
- Возможность сохранения и загрузки настроек
- Автоматическое обновление через GitHub
## Системные требования
- Windows 7/8/10/11
- Python 3.8 или выше
- Доступ к COM-портам
## Установка
1. Скачайте последнюю версию программы из [репозитория](https://gitea.filow.ru/LowaSC/ComConfigCopy/releases)
2. Распакуйте архив в удобное место
3. Запустите файл `ComConfigCopy.exe`
## Использование
1. Выберите COM-порт из списка доступных
2. Настройте параметры подключения (скорость, биты данных и т.д.)
3. Выберите файл конфигурации для отправки
4. Нажмите кнопку "Отправить" для начала процесса копирования
## Контакты
- Email: SPRF555@gmail.com
- Telegram: [@LowaSC](https://t.me/LowaSC)
- Репозиторий: [ComConfigCopy](https://gitea.filow.ru/LowaSC/ComConfigCopy)
## Лицензия
Этот проект распространяется под лицензией MIT. Подробности смотрите в файле [LICENSE](LICENSE).

View File

@@ -224,6 +224,8 @@ class TFTPServer:
Передача файла клиенту по протоколу TFTP.
"""
BLOCK_SIZE = 512
MAX_RETRIES = 5
TIMEOUT = 2.0
transfer_socket = None
try:
if not os.path.exists(file_path):
@@ -249,124 +251,99 @@ class TFTPServer:
'start_time': start_time
}
self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.")
# Создаем отдельный сокет для передачи файла
# Создаем новый сокет для передачи данных
transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
transfer_socket.settimeout(5.0) # Таймаут ожидания ACK
transfer_socket.settimeout(TIMEOUT)
# Добавляем сокет в множество активных сокетов
with self.lock:
self.transfer_sockets.add(transfer_socket)
block_num = 1
bytes_sent = 0
last_progress_time = time.time()
self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.")
try:
with open(file_path, "rb") as f:
while self.running: # Проверяем флаг running
try:
data_block = f.read(BLOCK_SIZE)
if not data_block: # Достигнут конец файла
break
with open(file_path, 'rb') as file:
block_number = 1
last_successful_block = 0
# Проверяем флаг running перед отправкой блока
if not self.running:
raise Exception("Передача прервана: сервер остановлен")
while True:
# Читаем блок данных
data = file.read(BLOCK_SIZE)
# Формируем TFTP пакет данных
packet = struct.pack("!HH", 3, block_num) + data_block
attempts = 0
ack_received = False
# Попытка отправки текущего блока (до 3 повторных попыток)
while attempts < 3 and not ack_received and self.running:
if transfer_socket is None:
raise Exception("Сокет передачи закрыт")
# Формируем и отправляем пакет данных
packet = struct.pack('!HH', 3, block_number) + data
retries = 0
while retries < MAX_RETRIES:
try:
transfer_socket.sendto(packet, client_addr)
# Логируем прогресс каждую секунду
current_time = time.time()
if current_time - last_progress_time >= 1.0:
elapsed_time = current_time - start_time
remaining = filesize - bytes_sent
self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | "
f"Отправлено: {bytes_sent}/{filesize} байт | "
f"Осталось: {remaining} байт | "
f"Время: {elapsed_time:.2f} сек.")
last_progress_time = current_time
# Ожидаем подтверждение
ack_data, addr = transfer_socket.recvfrom(4)
if addr == client_addr:
ack_opcode, ack_block = struct.unpack("!HH", ack_data)
if ack_opcode == 4 and ack_block == block_num:
ack_received = True
bytes_sent += len(data_block)
while True:
try:
ack_data, ack_addr = transfer_socket.recvfrom(4)
if ack_addr == client_addr and len(ack_data) >= 4:
opcode, ack_block = struct.unpack('!HH', ack_data)
if opcode == 4: # ACK
if ack_block == block_number:
# Успешное подтверждение
last_successful_block = block_number
bytes_sent = min((block_number * BLOCK_SIZE), filesize)
# Обновляем информацию о прогрессе
with self.lock:
if client_addr in self.active_transfers:
self.active_transfers[client_addr]['bytes_sent'] = bytes_sent
else:
self.log(f"[WARN] Неверный ACK от {client_addr}. "
f"Ожидался блок {block_num}, получен {ack_block}.")
# Логируем статус каждую секунду
current_time = time.time()
if current_time - start_time >= 1.0:
bytes_remaining = filesize - bytes_sent
elapsed_time = current_time - start_time
self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | "
f"Отправлено: {bytes_sent}/{filesize} байт | "
f"Осталось: {bytes_remaining} байт | "
f"Время: {elapsed_time:.2f} сек.")
break
elif ack_block < block_number:
# Получен старый ACK, игнорируем
continue
except socket.timeout:
attempts += 1
self.log(f"[WARN] Таймаут ожидания ACK для блока {block_num} "
f"от {client_addr}. Попытка {attempts+1}.")
except socket.error as e:
if not self.running:
raise Exception("Передача прервана: сервер остановлен")
self.log(f"[ERROR] Ошибка сокета при отправке блока {block_num}: {str(e)}")
attempts += 1
break
if last_successful_block == block_number:
break
else:
retries += 1
self.log(f"[WARN] Таймаут ожидания ACK для блока {block_number} от {client_addr}. "
f"Попытка {retries + 1}.")
except Exception as e:
if not self.running:
raise Exception("Передача прервана: сервер остановлен")
self.log(f"[ERROR] Ошибка при отправке блока {block_num}: {str(e)}")
attempts += 1
retries += 1
self.log(f"[ERROR] Ошибка при передаче блока {block_number}: {str(e)}")
if not ack_received:
raise Exception(f"Не удалось получить подтверждение для блока {block_num}")
if retries >= MAX_RETRIES:
self.log(f"[ERROR] Превышено максимальное количество попыток для блока {block_number}")
return
block_num = (block_num + 1) % 65536
block_number += 1
# Если отправили меньше BLOCK_SIZE байт, это был последний блок
if len(data) < BLOCK_SIZE:
break
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} завершена успешно")
except Exception as e:
if not self.running:
raise Exception("Передача прервана: сервер остановлен")
raise
if bytes_sent == filesize:
elapsed_time = time.time() - start_time
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} "
f"завершена за {elapsed_time:.2f} сек. Всего отправлено {bytes_sent} байт.")
except Exception as e:
if not self.running:
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} прервана: сервер остановлен")
raise
except Exception as e:
if not self.running:
return # Не логируем повторно о прерывании передачи
self.log(f"[ERROR] Ошибка при передаче файла '{os.path.basename(file_path)}' "
f"клиенту {client_addr}: {str(e)}")
try:
self.send_error(client_addr, 0, str(e))
except:
pass
self.log(f"[ERROR] Ошибка при передаче файла: {str(e)}")
finally:
# Закрываем сокет передачи
if transfer_socket:
try:
with self.lock:
self.transfer_sockets.discard(transfer_socket)
transfer_socket.close()
transfer_socket = None
except:
pass
# Удаляем информацию о передаче
# Очищаем информацию о передаче
with self.lock:
if client_addr in self.active_transfers:
del self.active_transfers[client_addr]
if transfer_socket in self.transfer_sockets:
self.transfer_sockets.remove(transfer_socket)
if transfer_socket:
try:
transfer_socket.close()
except:
pass

View File

@@ -2,19 +2,22 @@
# -*- coding: utf-8 -*-
import tkinter as tk
from tkinter import ttk, BOTH, X, BOTTOM
from tkinter import ttk, BOTH, X, BOTTOM, END
import webbrowser
class AboutWindow(tk.Toplevel):
def __init__(self, parent):
super().__init__(parent)
self.title("О программе")
self.geometry("400x300")
self.geometry("600x500")
self.resizable(False, False)
# Создаем фрейм
# Сохраняем ссылку на родительское окно
self.parent = parent
# Создаем фрейм для содержимого
about_frame = ttk.Frame(self, padding="20")
about_frame.pack(fill=BOTH, expand=True)
about_frame.pack(fill=BOTH, expand=True, padx=10, pady=10)
# Заголовок
ttk.Label(
@@ -33,7 +36,7 @@ class AboutWindow(tk.Toplevel):
# Версия
ttk.Label(
about_frame,
text="Версия 1.0",
text=f"Версия {getattr(parent, 'VERSION', '1.0.0')}",
font=("Segoe UI", 10)
).pack(pady=(0, 20))
@@ -66,7 +69,7 @@ class AboutWindow(tk.Toplevel):
ttk.Label(
contact_frame,
text="Email: LowaWorkMail@gmail.com"
text="Email: SPRF555@gmail.com"
).pack(anchor="w")
telegram_link = ttk.Label(
@@ -80,10 +83,10 @@ class AboutWindow(tk.Toplevel):
# Кнопка закрытия
ttk.Button(
about_frame,
self,
text="Закрыть",
command=self.destroy
).pack(side=BOTTOM, pady=(20, 0))
).pack(side=BOTTOM, pady=10)
# Центрируем окно
self.center_window()

Binary file not shown.

View File

@@ -1 +1,4 @@
tftpy>=0.8.0
pyserial>=3.5
requests>=2.31.0
packaging>=23.2

165
update_checker.py Normal file
View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import requests
import threading
import re
from packaging import version
import xml.etree.ElementTree as ET
import html
class UpdateCheckError(Exception):
"""Исключение для ошибок проверки обновлений"""
pass
class ReleaseType:
"""Типы релизов"""
STABLE = "stable"
PRERELEASE = "prerelease"
class UpdateChecker:
"""Класс для проверки обновлений программы"""
def __init__(self, current_version, repo_url, include_prereleases=False):
self.current_version = current_version
self.repo_url = repo_url
self.include_prereleases = include_prereleases
self.rss_url = f"{repo_url}/releases.rss"
self.release_info = None
def _clean_html(self, html_text):
"""Очищает HTML-разметку и форматирует текст"""
if not html_text:
return ""
text = re.sub(r'<[^>]+>', '', html_text)
text = html.unescape(text)
text = re.sub(r'\n\s*\n', '\n\n', text)
return '\n'.join(line.strip() for line in text.splitlines()).strip()
def _parse_release_info(self, item):
"""Извлекает информацию о релизе из RSS item"""
title = item.find('title').text if item.find('title') is not None else ''
link = item.find('link').text if item.find('link') is not None else ''
description = item.find('description').text if item.find('description') is not None else ''
content = item.find('{http://purl.org/rss/1.0/modules/content/}encoded')
content_text = content.text if content is not None else ''
# Извлекаем версию и проверяем тип релиза из тега
version_match = re.search(r'/releases/tag/(?:pre-)?v?(\d+\.\d+(?:\.\d+)?)', link)
if not version_match:
return None
version_str = version_match.group(1)
# Проверяем наличие префикса pre- в теге
is_prerelease = 'pre-' in link.lower()
# Форматируем название релиза
formatted_title = title
if title == version_str or not title.strip():
# Если заголовок пустой или совпадает с версией, создаем стандартное название
release_type = "Пре-релиз" if is_prerelease else "Версия"
formatted_title = f"{release_type} {version_str}"
elif not re.search(version_str, title):
# Если версия не указана в заголовке, добавляем её
formatted_title = f"{title} ({version_str})"
# Форматируем описание
formatted_description = self._clean_html(content_text or description)
if not formatted_description.strip():
formatted_description = "Нет описания"
# Добавляем метку типа релиза в начало описания
release_type_label = "[Пре-релиз] " if is_prerelease else ""
formatted_description = f"{release_type_label}{formatted_description}"
return {
'title': formatted_title,
'link': link,
'description': formatted_description,
'version': version_str,
'type': ReleaseType.PRERELEASE if is_prerelease else ReleaseType.STABLE
}
def check_updates(self, callback=None):
"""Проверяет наличие обновлений в асинхронном режиме"""
def check_worker():
try:
logging.info(f"Текущая версия программы: {self.current_version}")
logging.info(f"Проверка пре-релизов: {self.include_prereleases}")
logging.info(f"Запрос RSS ленты: {self.rss_url}")
response = requests.get(self.rss_url, timeout=10)
response.raise_for_status()
root = ET.fromstring(response.content)
items = root.findall('.//item')
if not items:
raise UpdateCheckError("Релизы не найдены")
logging.info(f"Найдено {len(items)} релизов")
latest_version = None
latest_info = None
for item in items:
release_info = self._parse_release_info(item)
if not release_info:
continue
is_prerelease = release_info['type'] == ReleaseType.PRERELEASE
logging.info(
f"Проверка релиза: {release_info['title']}, "
f"версия: {release_info['version']}, "
f"тип: {'пре-релиз' if is_prerelease else 'стабильная'}"
)
# Пропускаем пре-релизы если они не включены
if is_prerelease and not self.include_prereleases:
logging.info(f"Пропуск пре-релиза: {release_info['version']}")
continue
# Сравниваем версии
try:
current_ver = version.parse(latest_version or "0.0.0")
new_ver = version.parse(release_info['version'].split('-')[0]) # Убираем суффикс для сравнения
if new_ver > current_ver:
latest_version = release_info['version']
latest_info = release_info
logging.info(f"Новая версия: {latest_version}")
except version.InvalidVersion as e:
logging.warning(f"Некорректный формат версии {release_info['version']}: {e}")
continue
if not latest_info:
raise UpdateCheckError("Не найдены подходящие версии")
self.release_info = latest_info
# Сравниваем с текущей версией
current_ver = version.parse(self.current_version)
latest_ver = version.parse(latest_version.split('-')[0])
update_available = latest_ver > current_ver
logging.info(f"Сравнение версий: текущая {current_ver} <-> последняя {latest_ver}")
logging.info(f"Доступно обновление: {update_available}")
if callback:
callback(update_available, None)
except UpdateCheckError as e:
logging.error(str(e))
if callback:
callback(False, str(e))
except Exception as e:
logging.error(f"Ошибка при проверке обновлений: {e}", exc_info=True)
if callback:
callback(False, str(e))
threading.Thread(target=check_worker, daemon=True).start()
def get_release_notes(self):
"""Возвращает информацию о последнем релизе"""
return self.release_info