13 Commits

Author SHA1 Message Date
2f4b2985cd Remove documentation and CLI mode features
- Remove "Documentation" menu option
- Delete unused CLI mode code
- Clean up commented-out argument parsing function
2025-02-16 05:00:43 +03:00
d1a870fed7 Add update checking and documentation features
- Implement UpdateChecker for version comparison and update notifications
- Add menu options for documentation and update checking
- Enhance AboutWindow with dynamic version display
- Update requirements.txt with new dependencies
- Create infrastructure for opening local documentation
- Improve application menu with additional help options
2025-02-16 04:50:33 +03:00
2e2dd9e705 Add custom text and entry widgets with enhanced copy/paste functionality
- Implement CustomText and CustomEntry classes with advanced text interaction features
- Add context menu for text widgets with cut, copy, paste, and select all options
- Support multiple keyboard shortcuts for text manipulation
- Replace standard Tkinter Text and Entry widgets with custom implementations
- Remove global text/entry widget bindings in favor of class-specific methods
2025-02-16 03:57:48 +03:00
d937042ea2 Update application title to reflect project name
- Change window title from "Serial Device Manager" to "ComConfigCopy"
2025-02-16 03:53:21 +03:00
136c7877d3 Refactor TFTP file transfer with improved reliability and error handling
- Implement more robust file transfer mechanism with configurable retry and timeout settings
- Add detailed logging for transfer progress and error scenarios
- Enhance block transfer logic with better error recovery
- Simplify transfer socket management and cleanup process
- Improve overall transfer reliability and error tracking
2025-02-16 03:50:27 +03:00
467d582095 Improve file transfer progress tracking and display
- Add dynamic transfer speed calculation
- Compute and display estimated remaining transfer time
- Enhance remaining bytes display with more informative status
- Update transfers table with more detailed transfer progress information
2025-02-16 03:43:34 +03:00
16526b4643 Merge pull request 'TFTP' (#3) from TFTP into main
Reviewed-on: #3
2025-02-16 00:39:43 +00:00
6d2819a860 Add network adapter selection for TFTP server
- Implement `get_network_adapters()` function to dynamically retrieve available network interfaces
- Replace IP address entry with a combobox for network adapter selection
- Add "Update" button to refresh network adapter list
- Improve IP address validation and error handling for TFTP server configuration
- Enhance UI with more user-friendly network interface selection
2025-02-16 03:34:57 +03:00
a252a0f153 Prevent duplicate TFTP server log messages and improve state tracking
- Add filtering mechanism to prevent repeated server start/stop log entries
- Implement state tracking flags to manage server status
- Remove redundant log messages in both ComConfigCopy.py and TFTPServer.py
- Enhance log callback to avoid unnecessary logging of server state changes
2025-02-16 03:31:32 +03:00
3126811f09 Improve TFTP server shutdown and error handling
- Enhance server stop mechanism with more robust socket and thread management
- Add better handling of active transfers during server shutdown
- Implement additional safety checks and timeout handling
- Improve logging and error reporting for server stop process
- Prevent potential deadlocks and resource leaks during server termination
2025-02-16 03:28:53 +03:00
f1ca31c198 Enhance TFTP server implementation with advanced monitoring and UI improvements
- Completely refactor TFTP server implementation with more robust file transfer handling
- Add detailed transfer tracking with active transfers table
- Implement periodic transfer status updates
- Improve log and UI layout for TFTP server tab
- Add more granular error handling and logging
- Enhance threading and socket management for file transfers
2025-02-16 03:19:44 +03:00
c95915483f Update .gitignore to include virtual environment directory
- Add '.venv/' to .gitignore to exclude Python virtual environment files
2025-02-16 03:09:23 +03:00
299ce329f7 Add TFTP server functionality to the application
- Implement TFTP server tab with IP and port configuration
- Create methods to start and stop TFTP server
- Add logging functionality for TFTP server events
- Integrate TFTPServer class into the main application
- Re-enable Firmware directory creation
2025-02-16 02:51:47 +03:00
6 changed files with 1072 additions and 145 deletions

View File

@@ -17,6 +17,7 @@ import re
import sys
import threading
import time
import webbrowser
from getpass import getpass
from logging.handlers import RotatingFileHandler
import tkinter as tk
@@ -37,13 +38,20 @@ import serial
import serial.tools.list_ports
from serial.serialutil import SerialException
from about_window import AboutWindow
from TFTPServer import TFTPServer
# from TFTPServer import TFTPServerThread
import socket
from update_checker import UpdateChecker
# Версия программы
VERSION = "1.0.0"
# Создаем необходимые папки
os.makedirs("Logs", exist_ok=True)
os.makedirs("Configs", exist_ok=True)
os.makedirs("Settings", exist_ok=True)
# os.makedirs("Firmware", exist_ok=True)
os.makedirs("Firmware", exist_ok=True)
os.makedirs("docs", exist_ok=True)
# Файл настроек находится в папке Settings
SETTINGS_FILE = os.path.join("Settings", "settings.json")
@@ -137,25 +145,37 @@ def list_serial_ports():
# Функции работы с сетевыми адаптерами (не используются)
# ==========================
# def list_network_adapters():
# """Возвращает список названий сетевых адаптеров (Windows)."""
# adapters = []
# if platform.system() == "Windows":
# try:
# output = subprocess.check_output(
# 'wmic nic get NetConnectionID',
# shell=True,
# encoding="cp866"
# )
# for line in output.splitlines():
# line = line.strip()
# if line and line != "NetConnectionID":
# adapters.append(line)
# except Exception as e:
# logging.error(f"Ошибка при получении списка адаптеров: {e}", exc_info=True)
# else:
# adapters = ["eth0"]
# return adapters
def get_network_adapters():
"""Получение списка сетевых адаптеров и их IP-адресов."""
adapters = []
try:
# Получаем имя хоста
hostname = socket.gethostname()
# Получаем все адреса для данного хоста
addresses = socket.getaddrinfo(hostname, None)
# Создаем множество для хранения уникальных IP-адресов
unique_ips = set()
for addr in addresses:
ip = addr[4][0]
# Пропускаем IPv6 и локальные адреса
if ':' not in ip and not ip.startswith('127.'):
unique_ips.add(ip)
# Добавляем все найденные IP-адреса в список
for ip in sorted(unique_ips):
adapters.append(f"{ip}")
# Добавляем 0.0.0.0 для прослушивания всех интерфейсов
adapters.insert(0, "0.0.0.0")
except Exception as e:
logging.error(f"Ошибка при получении списка сетевых адаптеров: {e}", exc_info=True)
# В случае ошибки возвращаем хотя бы 0.0.0.0
adapters = ["0.0.0.0"]
return adapters
# ==========================
# Функции работы с COM-соединением
@@ -423,9 +443,109 @@ def execute_commands_from_file(
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
# ==========================
# Графический интерфейс (Tkinter)
# Улучшенные текстовые виджеты
# ==========================
class CustomText(tk.Text):
"""Улучшенный текстовый виджет с расширенной функциональностью копирования/вставки"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_context_menu()
self.bind_shortcuts()
def create_context_menu(self):
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
def show_context_menu(self, event):
self.context_menu.post(event.x_root, event.y_root)
def bind_shortcuts(self):
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Shift+Insert для вставки
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
# Ctrl+Insert для копирования
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
# Shift+Delete для вырезания
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
def cut(self):
self.event_generate("<<Cut>>")
def copy(self):
self.event_generate("<<Copy>>")
def paste(self):
self.event_generate("<<Paste>>")
def select_all(self, event=None):
self.tag_add(tk.SEL, "1.0", tk.END)
self.mark_set(tk.INSERT, "1.0")
self.see(tk.INSERT)
return "break"
class CustomEntry(ttk.Entry):
"""Улучшенное поле ввода с расширенной функциональностью копирования/вставки"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_context_menu()
self.bind_shortcuts()
def create_context_menu(self):
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
def show_context_menu(self, event):
self.context_menu.post(event.x_root, event.y_root)
def bind_shortcuts(self):
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Shift+Insert для вставки
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
# Ctrl+Insert для копирования
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
# Shift+Delete для вырезания
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
def cut(self):
self.event_generate("<<Cut>>")
def copy(self):
self.event_generate("<<Copy>>")
def paste(self):
self.event_generate("<<Paste>>")
def select_all(self, event=None):
self.select_range(0, tk.END)
return "break"
class SettingsWindow(tk.Toplevel):
def __init__(self, parent, settings, callback=None):
super().__init__(parent)
@@ -472,13 +592,13 @@ class SettingsWindow(tk.Toplevel):
# Размер блока
ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5)
self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
block_size_entry = ttk.Entry(settings_frame, textvariable=self.block_size_var)
block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var)
block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
# Приглашение командной строки
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
self.prompt_var = StringVar(value=settings.get("prompt", ">"))
prompt_entry = ttk.Entry(settings_frame, textvariable=self.prompt_var)
prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var)
prompt_entry.grid(row=5, column=1, sticky=W, pady=5)
# Кнопки
@@ -527,22 +647,27 @@ class SettingsWindow(tk.Toplevel):
class SerialAppGUI(tk.Tk):
def __init__(self, settings):
super().__init__()
self.title("Serial Device Manager")
self.title("ComConfigCopy")
self.geometry("900x700")
# Добавляем VERSION как атрибут класса
self.VERSION = VERSION
# Инициализация проверки обновлений
self.update_checker = UpdateChecker(
VERSION,
"https://gitea.filow.ru/LowaSC/ComConfigCopy"
)
# Настройка стиля
self.style = ttk.Style(self)
self.style.theme_use("clam")
default_font = ("Segoe UI", 10)
self.option_add("*Font", default_font)
self.settings = settings
self.connection = None
# Глобальные биндинги
self.bind_class("Text", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Text", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Text", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.bind_class("Entry", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Entry", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Entry", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.tftp_server = None
self.create_menu()
self.create_tabs()
@@ -550,6 +675,66 @@ class SerialAppGUI(tk.Tk):
# Проверка первого запуска
self.check_first_run()
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="Проверить обновления", command=self.check_for_updates)
help_menu.add_separator()
help_menu.add_command(label="О программе", command=self.open_about)
def check_for_updates(self):
"""Проверка наличия обновлений"""
def on_update_check(update_available, error):
if error:
messagebox.showerror(
"Ошибка проверки обновлений",
f"Не удалось проверить наличие обновлений:\n{error}"
)
elif update_available:
release_info = self.update_checker.get_release_notes()
if release_info:
response = messagebox.askyesno(
"Доступно обновление",
f"Доступна новая версия {release_info['version']}!\n\n"
f"Изменения:\n{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?",
)
if response:
webbrowser.open(release_info["download_url"])
else:
messagebox.showerror(
"Ошибка",
"Не удалось получить информацию о новой версии"
)
else:
messagebox.showinfo(
"Проверка обновлений",
"У вас установлена последняя версия программы"
)
self.update_checker.check_updates(callback=on_update_check)
def on_settings_changed(self):
"""Обработчик изменения настроек"""
self.settings = settings_load()
def open_settings(self):
"""Открытие окна настроек"""
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
def check_first_run(self):
"""Проверка первого запуска программы"""
show_settings = False
@@ -580,31 +765,6 @@ class SerialAppGUI(tk.Tk):
if response:
self.open_settings()
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="О программе", command=self.open_about)
def open_settings(self):
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
def on_settings_changed(self):
# Обновляем настройки в основном приложении
self.settings = settings_load()
def create_tabs(self):
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
@@ -613,14 +773,17 @@ class SerialAppGUI(tk.Tk):
interactive_frame = ttk.Frame(self.notebook)
file_exec_frame = ttk.Frame(self.notebook)
config_editor_frame = ttk.Frame(self.notebook)
tftp_frame = ttk.Frame(self.notebook)
self.notebook.add(interactive_frame, text="Интерактивный режим")
self.notebook.add(file_exec_frame, text="Выполнение файла")
self.notebook.add(config_editor_frame, text="Редактор конфигурации")
self.notebook.add(tftp_frame, text="TFTP Сервер")
self.create_interactive_tab(interactive_frame)
self.create_file_exec_tab(file_exec_frame)
self.create_config_editor_tab(config_editor_frame)
self.create_tftp_tab(tftp_frame)
# -------------- Вкладка "Интерактивный режим" --------------
def create_interactive_tab(self, frame):
@@ -629,13 +792,13 @@ class SerialAppGUI(tk.Tk):
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
self.interactive_text = tk.Text(frame, wrap="word", height=20)
self.interactive_text = CustomText(frame, wrap="word", height=20)
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
input_frame = ttk.Frame(frame)
input_frame.pack(fill=X, pady=5)
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
self.command_entry = ttk.Entry(input_frame, width=50)
self.command_entry = CustomEntry(input_frame, width=50)
self.command_entry.pack(side=LEFT, padx=5)
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
@@ -726,10 +889,10 @@ class SerialAppGUI(tk.Tk):
file_frame.pack(fill=X, pady=5)
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
self.file_exec_text = tk.Text(frame, wrap="word", height=15)
self.file_exec_text = CustomText(frame, wrap="word", height=15)
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def select_config_file_fileexec(self):
@@ -775,11 +938,11 @@ class SerialAppGUI(tk.Tk):
top_frame.pack(fill=X, pady=5)
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
self.config_editor_text = tk.Text(frame, wrap="word")
self.config_editor_text = CustomText(frame, wrap="word")
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def select_config_file_editor(self):
@@ -823,81 +986,270 @@ class SerialAppGUI(tk.Tk):
about_window.transient(self)
about_window.grab_set()
# ==========================
# Парсер аргументов (не используется)
# ==========================
# def parse_arguments():
# parser = argparse.ArgumentParser(
# description="Программа для работы с устройствами через последовательный порт с графическим интерфейсом."
# )
# parser.add_argument("--cli", action="store_true", help="Запустить в режиме командной строки (без графики)")
# return parser.parse_args()
def create_tftp_tab(self, frame):
"""Создание вкладки TFTP сервера."""
# Создаем фрейм для управления TFTP сервером
tftp_frame = ttk.Frame(frame)
tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
# ==========================
# Режим командной строки (не используется)
# ==========================
# Создаем и размещаем элементы управления
controls_frame = ttk.LabelFrame(tftp_frame, text="Управление TFTP сервером")
controls_frame.pack(fill=X, padx=5, pady=5)
# def run_cli_mode(settings):
# print("Запущен режим командной строки.")
# while True:
# print("\n--- Главное меню ---")
# print("1. Подключиться и войти в интерактивный режим")
# print("2. Выполнить команды из файла конфигурации")
# print("0. Выход")
# choice = input("Выберите действие: ").strip()
# if choice == "1":
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if not conn:
# print("[ERROR] Не удалось установить соединение.")
# continue
# print("[INFO] Введите команды (для выхода введите _exit):")
# while True:
# cmd = input("Команда: ")
# if cmd.strip().lower() == "_exit":
# break
# try:
# conn.write((cmd + "\n").encode())
# response = read_response(
# conn, settings.get("timeout", 10),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False
# )
# print(response)
# except Exception as e:
# print(f"[ERROR] {e}")
# break
# conn.close()
# elif choice == "2":
# if not settings.get("config_file"):
# print("[ERROR] Файл конфигурации не выбран!")
# continue
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if conn:
# execute_commands_from_file(
# conn,
# settings["config_file"],
# settings.get("timeout", 10),
# settings.get("copy_mode", "line"),
# settings.get("block_size", 15),
# lambda msg: print(msg),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False,
# )
# conn.close()
# else:
# print("[ERROR] Не удалось установить соединение.")
# elif choice == "0":
# break
# else:
# print("[ERROR] Некорректный выбор.")
# IP адрес
ip_frame = ttk.Frame(controls_frame)
ip_frame.pack(fill=X, padx=5, pady=2)
ttk.Label(ip_frame, text="IP адрес:").pack(side=LEFT, padx=5)
self.tftp_ip_var = StringVar(value="0.0.0.0")
self.tftp_ip_combo = ttk.Combobox(ip_frame, textvariable=self.tftp_ip_var, state="readonly")
self.tftp_ip_combo.pack(side=LEFT, fill=X, expand=True, padx=5)
ttk.Button(ip_frame, text="Обновить", command=self.update_network_adapters).pack(side=LEFT, padx=5)
# Заполняем список адаптеров
self.update_network_adapters()
# Порт
port_frame = ttk.Frame(controls_frame)
port_frame.pack(fill=X, padx=5, pady=2)
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
self.tftp_port_var = StringVar(value="69")
self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var)
self.tftp_port_entry.pack(fill=X, expand=True, padx=5)
# Кнопки управления
buttons_frame = ttk.Frame(controls_frame)
buttons_frame.pack(fill=X, padx=5, pady=5)
self.start_tftp_button = ttk.Button(
buttons_frame,
text="Запустить сервер",
command=self.start_tftp_server
)
self.start_tftp_button.pack(side=LEFT, padx=5)
self.stop_tftp_button = ttk.Button(
buttons_frame,
text="Остановить сервер",
command=self.stop_tftp_server,
state="disabled"
)
self.stop_tftp_button.pack(side=LEFT, padx=5)
# Лог сервера
log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера")
log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
self.tftp_log_text = CustomText(log_frame, wrap=tk.WORD, height=10)
self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Добавляем скроллбар для лога
scrollbar = ttk.Scrollbar(self.tftp_log_text, command=self.tftp_log_text.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.tftp_log_text.config(yscrollcommand=scrollbar.set)
# Статус передач
transfers_frame = ttk.LabelFrame(tftp_frame, text="Активные передачи")
transfers_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Создаем таблицу для отображения активных передач
columns = ("client", "filename", "progress", "remaining", "time")
self.transfers_tree = ttk.Treeview(transfers_frame, columns=columns, show="headings")
# Настраиваем заголовки колонок
self.transfers_tree.heading("client", text="Клиент")
self.transfers_tree.heading("filename", text="Файл")
self.transfers_tree.heading("progress", text="Прогресс")
self.transfers_tree.heading("remaining", text="Осталось")
self.transfers_tree.heading("time", text="Время")
# Настраиваем ширину колонок
self.transfers_tree.column("client", width=120)
self.transfers_tree.column("filename", width=150)
self.transfers_tree.column("progress", width=100)
self.transfers_tree.column("remaining", width=100)
self.transfers_tree.column("time", width=80)
self.transfers_tree.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Инициализация TFTP сервера
self.tftp_server = None
self.tftp_server_thread = None
def start_tftp_server(self):
"""Запуск TFTP сервера."""
try:
# Получаем выбранный IP-адрес
ip = self.tftp_ip_var.get()
if not ip:
messagebox.showerror("Ошибка", "Выберите IP-адрес для TFTP сервера")
return
# Проверяем корректность порта
try:
port = int(self.tftp_port_var.get())
if port <= 0 or port > 65535:
raise ValueError("Порт должен быть в диапазоне 1-65535")
except ValueError as e:
messagebox.showerror("Ошибка", f"Некорректный порт: {str(e)}")
return
# Создаем экземпляр TFTP сервера
self.tftp_server = TFTPServer("Firmware")
# Устанавливаем callback для логирования
def log_callback(message):
# Фильтруем дублирующиеся сообщения о запуске/остановке сервера
if "[INFO] TFTP сервер запущен" in message and hasattr(self, '_server_started'):
return
if "[INFO] TFTP сервер остановлен" in message and hasattr(self, '_server_stopped'):
return
self.append_tftp_log(message)
# Устанавливаем флаги для отслеживания состояния
if "[INFO] TFTP сервер запущен" in message:
self._server_started = True
elif "[INFO] TFTP сервер остановлен" in message:
self._server_stopped = True
# Обновляем информацию о передачах
self.update_transfers_info()
self.tftp_server.set_log_callback(log_callback)
# Запускаем сервер в отдельном потоке
self.tftp_server_thread = threading.Thread(
target=self.run_tftp_server,
args=(ip, port),
daemon=True
)
self.tftp_server_thread.start()
# Обновляем состояние кнопок и элементов управления
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal")
self.tftp_ip_combo.config(state="disabled")
self.tftp_port_entry.config(state="disabled")
# Запускаем периодическое обновление информации о передачах
self.update_transfers_periodically()
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}")
def run_tftp_server(self, ip, port):
"""Запуск TFTP сервера в отдельном потоке."""
try:
self.tftp_server.start_server(ip, port)
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}")
def stop_tftp_server(self):
"""Остановка TFTP сервера."""
if self.tftp_server:
try:
# Отключаем кнопки на время остановки сервера
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="disabled")
# Сбрасываем флаги состояния
if hasattr(self, '_server_started'):
delattr(self, '_server_started')
if hasattr(self, '_server_stopped'):
delattr(self, '_server_stopped')
# Останавливаем сервер
self.tftp_server.stop_server()
# Ждем завершения потока сервера с таймаутом
if self.tftp_server_thread:
self.tftp_server_thread.join(timeout=5.0)
if self.tftp_server_thread.is_alive():
self.append_tftp_log("[WARN] Превышено время ожидания остановки сервера")
# Очищаем ссылки на сервер и поток
self.tftp_server = None
self.tftp_server_thread = None
# Обновляем состояние кнопок
self.start_tftp_button.config(state="normal")
self.stop_tftp_button.config(state="disabled")
self.tftp_ip_combo.config(state="normal")
self.tftp_port_entry.config(state="normal")
# Очищаем таблицу передач
for item in self.transfers_tree.get_children():
self.transfers_tree.delete(item)
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка остановки сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось остановить TFTP сервер: {str(e)}")
# Восстанавливаем состояние кнопок в случае ошибки
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal")
def append_tftp_log(self, message):
"""Добавление сообщения в лог TFTP сервера."""
self.tftp_log_text.insert(END, message + "\n")
self.tftp_log_text.see(END)
def update_transfers_info(self):
"""Обновление информации об активных передачах."""
if not self.tftp_server:
return
# Очищаем текущие записи
for item in self.transfers_tree.get_children():
self.transfers_tree.delete(item)
# Добавляем информацию о текущих передачах
for client_addr, transfer_info in self.tftp_server.active_transfers.items():
filename = transfer_info['filename']
bytes_sent = transfer_info['bytes_sent']
filesize = transfer_info['filesize']
start_time = transfer_info['start_time']
# Вычисляем прогресс
progress = f"{bytes_sent}/{filesize} байт"
remaining_bytes = filesize - bytes_sent
elapsed_time = time.time() - start_time
# Вычисляем скорость передачи (байт/сек)
if elapsed_time > 0:
transfer_speed = bytes_sent / elapsed_time
# Вычисляем оставшееся время
if transfer_speed > 0:
remaining_time = remaining_bytes / transfer_speed
remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)"
else:
remaining_str = f"{remaining_bytes} байт (неизвестно)"
else:
remaining_str = f"{remaining_bytes} байт (вычисляется...)"
# Добавляем запись в таблицу
self.transfers_tree.insert("", END, values=(
f"{client_addr[0]}:{client_addr[1]}",
filename,
progress,
remaining_str,
f"{elapsed_time:.1f}с"
))
def update_transfers_periodically(self):
"""Периодическое обновление информации о передачах."""
if self.tftp_server and self.tftp_server.running:
self.update_transfers_info()
# Планируем следующее обновление через 1 секунду
self.after(1000, self.update_transfers_periodically)
def update_network_adapters(self):
"""Обновление списка сетевых адаптеров."""
adapters = get_network_adapters()
self.tftp_ip_combo["values"] = adapters
if not self.tftp_ip_var.get() in adapters:
self.tftp_ip_var.set(adapters[0])
# ==========================
# Основной запуск приложения

44
README.md Normal file
View File

@@ -0,0 +1,44 @@
# ComConfigCopy
Программа для копирования конфигураций на коммутаторы.
## Описание
ComConfigCopy - это утилита, разработанная для автоматизации процесса копирования конфигураций на сетевые коммутаторы. Программа предоставляет удобный графический интерфейс для управления процессом копирования и настройки параметров подключения.
## Основные возможности
- Копирование конфигураций на коммутаторы через COM-порт
- Поддержка различных скоростей подключения
- Автоматическое определение доступных COM-портов
- Возможность сохранения и загрузки настроек
- Автоматическое обновление через GitHub
## Системные требования
- Windows 7/8/10/11
- Python 3.8 или выше
- Доступ к COM-портам
## Установка
1. Скачайте последнюю версию программы из [репозитория](https://gitea.filow.ru/LowaSC/ComConfigCopy/releases)
2. Распакуйте архив в удобное место
3. Запустите файл `ComConfigCopy.exe`
## Использование
1. Выберите COM-порт из списка доступных
2. Настройте параметры подключения (скорость, биты данных и т.д.)
3. Выберите файл конфигурации для отправки
4. Нажмите кнопку "Отправить" для начала процесса копирования
## Контакты
- Email: LowaWorkMail@gmail.com
- Telegram: [@LowaSC](https://t.me/LowaSC)
- Репозиторий: [ComConfigCopy](https://gitea.filow.ru/LowaSC/ComConfigCopy)
## Лицензия
Этот проект распространяется под лицензией MIT. Подробности смотрите в файле [LICENSE](LICENSE).

349
TFTPServer.py Normal file
View File

@@ -0,0 +1,349 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TFTP сервер для передачи прошивки с компьютера на коммутатор.
- Создает сервер по заданному IP и порту.
- Расшаривает папку Firmware.
- Показывает текущее состояние сервера и статус передачи файла:
- кому (IP устройства),
- сколько осталось байт,
- сколько передано байт,
- время передачи.
"""
import os
import socket
import struct
import threading
import time
class TFTPServer:
def __init__(self, share_folder):
"""
Инициализация TFTP сервера.
:param share_folder: Путь к папке, содержащей файлы (например, папка 'Firmware')
"""
self.share_folder = share_folder
self.log_callback = None
self.running = False
self.server_socket = None
self.lock = threading.Lock()
self.transfer_sockets = set() # Множество для хранения всех активных сокетов передачи
# Словарь активных передач для мониторинга их статуса.
# Ключ адрес клиента, значение словарь с информацией о передаче.
self.active_transfers = {}
def set_log_callback(self, callback):
"""
Установка функции обратного вызова для логирования сообщений.
:param callback: Функция, принимающая строку сообщения.
"""
self.log_callback = callback
def log(self, message):
"""
Функция логирования: вызывает callback (если задан) или выводит сообщение в консоль.
:param message: Строка с сообщением для логирования.
"""
if self.log_callback:
self.log_callback(message)
else:
print(message)
def start_server(self, ip, port):
"""
Запуск TFTP сервера на указанном IP и порту.
:param ip: IP-адрес для привязки сервера.
:param port: Порт для TFTP сервера.
"""
if self.running:
self.log("[WARN] Сервер уже запущен")
return
self.running = True
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.bind((ip, port))
self.log(f"[INFO] TFTP сервер запущен на {ip}:{port}")
while self.running:
try:
self.server_socket.settimeout(1.0)
data, client_addr = self.server_socket.recvfrom(2048)
if data and self.running:
threading.Thread(target=self.handle_request, args=(data, client_addr), daemon=True).start()
except socket.timeout:
continue
except socket.error as e:
if self.running: # Логируем ошибку только если сервер еще запущен
self.log(f"[ERROR] Ошибка получения данных: {str(e)}")
break
except Exception as e:
if self.running: # Логируем ошибку только если сервер еще запущен
self.log(f"[ERROR] Ошибка получения данных: {str(e)}")
break
except Exception as e:
self.log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
finally:
self.running = False
if self.server_socket:
try:
self.server_socket.close()
except:
pass
self.server_socket = None
def stop_server(self):
"""
Остановка TFTP сервера.
"""
if not self.running:
return
self.log("[INFO] Остановка TFTP сервера...")
self.running = False
try:
# Закрываем основной сокет сервера первым
if self.server_socket:
try:
# Создаем временный сокет и отправляем пакет самому себе,
# чтобы разблокировать recvfrom
temp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
server_address = self.server_socket.getsockname()
temp_socket.sendto(b'', server_address)
except:
pass
finally:
try:
temp_socket.close()
except:
pass
try:
self.server_socket.close()
except Exception as e:
self.log(f"[WARN] Ошибка при закрытии основного сокета: {str(e)}")
finally:
self.server_socket = None
# Закрываем все активные сокеты передачи
with self.lock:
active_sockets = list(self.transfer_sockets)
self.transfer_sockets.clear()
active_transfers = dict(self.active_transfers)
self.active_transfers.clear()
# Закрываем сокеты передачи после очистки множества
for sock in active_sockets:
try:
if sock:
sock.close()
except Exception as e:
self.log(f"[WARN] Ошибка при закрытии сокета передачи: {str(e)}")
# Отправляем сообщения об остановке для активных передач
for client_addr, transfer_info in active_transfers.items():
try:
self.send_error(client_addr, 0, "Сервер остановлен")
except:
pass
except Exception as e:
self.log(f"[ERROR] Ошибка при остановке сервера: {str(e)}")
finally:
self.running = False # Гарантируем, что флаг running будет False
self.log("[INFO] TFTP сервер остановлен")
def handle_request(self, data, client_addr):
"""
Обработка входящего запроса от клиента.
:param data: Полученные данные (UDP-пакет).
:param client_addr: Адрес клиента, отправившего пакет.
"""
if len(data) < 2:
self.log(f"[WARN] Получен некорректный пакет от {client_addr}")
return
opcode = struct.unpack("!H", data[:2])[0]
if opcode == 1: # RRQ (Read Request) запрос на чтение файла
self.handle_rrq(data, client_addr)
else:
self.log(f"[WARN] Неподдерживаемый запрос (опкод {opcode}) от {client_addr}")
def handle_rrq(self, data, client_addr):
"""
Обработка запроса на чтение файла (RRQ).
:param data: Данные запроса.
:param client_addr: Адрес клиента.
"""
try:
# RRQ формата: 2 байта опкода, затем строка имени файла, за которой следует 0,
# затем строка режима (например, "octet"), и завершается 0.
parts = data[2:].split(b'\0')
if len(parts) < 2:
self.log(f"[WARN] Некорректный RRQ пакет от {client_addr}")
return
filename = parts[0].decode('utf-8')
mode = parts[1].decode('utf-8').lower()
self.log(f"[INFO] Получен RRQ от {client_addr}: файл '{filename}', режим '{mode}'")
if mode != "octet":
self.send_error(client_addr, 0, "Поддерживается только octet режим")
return
file_path = os.path.join(self.share_folder, filename)
if not os.path.isfile(file_path):
self.send_error(client_addr, 1, "Файл не найден")
return
# Запускаем передачу файла в новом потоке.
threading.Thread(target=self.send_file, args=(file_path, client_addr), daemon=True).start()
except Exception as e:
self.log(f"[ERROR] Ошибка обработки RRQ: {str(e)}")
def send_error(self, client_addr, error_code, error_message):
"""
Отправка сообщения об ошибке клиенту.
:param client_addr: Адрес клиента.
:param error_code: Код ошибки.
:param error_message: Текст ошибки.
"""
# Формируем TFTP пакет ошибки: 2 байта опкода (5), 2 байта кода ошибки, сообщение об ошибке и завершающий 0.
packet = struct.pack("!HH", 5, error_code) + error_message.encode('utf-8') + b'\0'
self.server_socket.sendto(packet, client_addr)
self.log(f"[INFO] Отправлено сообщение об ошибке '{error_message}' клиенту {client_addr}")
def send_file(self, file_path, client_addr):
"""
Передача файла клиенту по протоколу TFTP.
"""
BLOCK_SIZE = 512
MAX_RETRIES = 5
TIMEOUT = 2.0
transfer_socket = None
try:
if not os.path.exists(file_path):
self.log(f"[ERROR] Файл '{file_path}' не существует")
self.send_error(client_addr, 1, "Файл не найден")
return
filesize = os.path.getsize(file_path)
if filesize == 0:
self.log(f"[ERROR] Файл '{file_path}' пуст")
self.send_error(client_addr, 0, "Файл пуст")
return
start_time = time.time()
file_basename = os.path.basename(file_path)
# Регистрируем активную передачу
with self.lock:
self.active_transfers[client_addr] = {
'filename': file_basename,
'filesize': filesize,
'bytes_sent': 0,
'start_time': start_time
}
# Создаем новый сокет для передачи данных
transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
transfer_socket.settimeout(TIMEOUT)
with self.lock:
self.transfer_sockets.add(transfer_socket)
self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.")
with open(file_path, 'rb') as file:
block_number = 1
last_successful_block = 0
while True:
# Читаем блок данных
data = file.read(BLOCK_SIZE)
# Формируем и отправляем пакет данных
packet = struct.pack('!HH', 3, block_number) + data
retries = 0
while retries < MAX_RETRIES:
try:
transfer_socket.sendto(packet, client_addr)
# Ожидаем подтверждение
while True:
try:
ack_data, ack_addr = transfer_socket.recvfrom(4)
if ack_addr == client_addr and len(ack_data) >= 4:
opcode, ack_block = struct.unpack('!HH', ack_data)
if opcode == 4: # ACK
if ack_block == block_number:
# Успешное подтверждение
last_successful_block = block_number
bytes_sent = min((block_number * BLOCK_SIZE), filesize)
# Обновляем информацию о прогрессе
with self.lock:
if client_addr in self.active_transfers:
self.active_transfers[client_addr]['bytes_sent'] = bytes_sent
# Логируем статус каждую секунду
current_time = time.time()
if current_time - start_time >= 1.0:
bytes_remaining = filesize - bytes_sent
elapsed_time = current_time - start_time
self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | "
f"Отправлено: {bytes_sent}/{filesize} байт | "
f"Осталось: {bytes_remaining} байт | "
f"Время: {elapsed_time:.2f} сек.")
break
elif ack_block < block_number:
# Получен старый ACK, игнорируем
continue
except socket.timeout:
break
if last_successful_block == block_number:
break
else:
retries += 1
self.log(f"[WARN] Таймаут ожидания ACK для блока {block_number} от {client_addr}. "
f"Попытка {retries + 1}.")
except Exception as e:
retries += 1
self.log(f"[ERROR] Ошибка при передаче блока {block_number}: {str(e)}")
if retries >= MAX_RETRIES:
self.log(f"[ERROR] Превышено максимальное количество попыток для блока {block_number}")
return
block_number += 1
# Если отправили меньше BLOCK_SIZE байт, это был последний блок
if len(data) < BLOCK_SIZE:
break
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} завершена успешно")
except Exception as e:
self.log(f"[ERROR] Ошибка при передаче файла: {str(e)}")
finally:
# Очищаем информацию о передаче
with self.lock:
if client_addr in self.active_transfers:
del self.active_transfers[client_addr]
if transfer_socket in self.transfer_sockets:
self.transfer_sockets.remove(transfer_socket)
if transfer_socket:
try:
transfer_socket.close()
except:
pass

View File

@@ -2,19 +2,22 @@
# -*- coding: utf-8 -*-
import tkinter as tk
from tkinter import ttk, BOTH, X, BOTTOM
from tkinter import ttk, BOTH, X, BOTTOM, END
import webbrowser
class AboutWindow(tk.Toplevel):
def __init__(self, parent):
super().__init__(parent)
self.title("О программе")
self.geometry("400x300")
self.geometry("600x500")
self.resizable(False, False)
# Создаем фрейм
# Сохраняем ссылку на родительское окно
self.parent = parent
# Создаем фрейм для содержимого
about_frame = ttk.Frame(self, padding="20")
about_frame.pack(fill=BOTH, expand=True)
about_frame.pack(fill=BOTH, expand=True, padx=10, pady=10)
# Заголовок
ttk.Label(
@@ -33,7 +36,7 @@ class AboutWindow(tk.Toplevel):
# Версия
ttk.Label(
about_frame,
text="Версия 1.0",
text=f"Версия {getattr(parent, 'VERSION', '1.0.0')}",
font=("Segoe UI", 10)
).pack(pady=(0, 20))
@@ -80,10 +83,10 @@ class AboutWindow(tk.Toplevel):
# Кнопка закрытия
ttk.Button(
about_frame,
self,
text="Закрыть",
command=self.destroy
).pack(side=BOTTOM, pady=(20, 0))
).pack(side=BOTTOM, pady=10)
# Центрируем окно
self.center_window()

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
tftpy>=0.8.0
pyserial>=3.5
requests>=2.31.0
packaging>=23.2

175
update_checker.py Normal file
View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import requests
import threading
from packaging import version
class UpdateCheckError(Exception):
"""Исключение для ошибок проверки обновлений"""
pass
class UpdateChecker:
"""Класс для проверки обновлений программы"""
def __init__(self, current_version, repo_url):
self.current_version = current_version
self.repo_url = repo_url
# Формируем базовый URL API
self.api_url = repo_url.replace("gitea.filow.ru", "gitea.filow.ru/api/v1/repos/LowaSC/ComConfigCopy")
self._update_available = False
self._latest_version = None
self._latest_release = None
self._error = None
self._changelog = None
def get_changelog(self, callback=None):
"""
Получение changelog из репозитория.
:param callback: Функция обратного вызова, которая будет вызвана после получения changelog
"""
def fetch():
try:
# Пытаемся получить CHANGELOG.md из репозитория
response = requests.get(f"{self.api_url}/contents/CHANGELOG.md", timeout=10)
response.raise_for_status()
content = response.json()
if "content" in content:
import base64
changelog_content = base64.b64decode(content["content"]).decode("utf-8")
self._changelog = changelog_content
self._error = None
else:
raise UpdateCheckError("Не удалось получить содержимое CHANGELOG.md")
except requests.RequestException as e:
error_msg = f"Ошибка получения changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
except Exception as e:
error_msg = f"Неизвестная ошибка при получении changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
finally:
if callback:
callback(self._changelog, self._error)
# Запускаем получение в отдельном потоке
threading.Thread(target=fetch, daemon=True).start()
def check_updates(self, callback=None):
"""
Проверка наличия обновлений.
:param callback: Функция обратного вызова, которая будет вызвана после проверки
"""
def check():
try:
response = requests.get(f"{self.api_url}/releases", timeout=10)
response.raise_for_status()
releases = response.json()
if not releases:
raise UpdateCheckError("Не найдено релизов в репозитории")
latest_release = releases[0]
latest_version = latest_release.get("tag_name", "").lstrip("v")
if not latest_version:
raise UpdateCheckError("Не удалось определить версию последнего релиза")
try:
if version.parse(latest_version) > version.parse(self.current_version):
self._update_available = True
self._latest_version = latest_version
self._latest_release = latest_release
logging.info(f"Доступно обновление: {latest_version}")
else:
logging.info("Обновления не требуются")
except version.InvalidVersion as e:
raise UpdateCheckError(f"Некорректный формат версии: {e}")
self._error = None
except requests.RequestException as e:
error_msg = f"Ошибка сетевого подключения: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
except UpdateCheckError as e:
logging.error(str(e), exc_info=True)
self._error = str(e)
except Exception as e:
error_msg = f"Неизвестная ошибка при проверке обновлений: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
finally:
if callback:
callback(self._update_available, self._error)
@property
def update_available(self):
"""Доступно ли обновление"""
return self._update_available
@property
def latest_version(self):
"""Последняя доступная версия"""
return self._latest_version
@property
def error(self):
"""Последняя ошибка при проверке обновлений"""
return self._error
@property
def changelog(self):
"""Текущий changelog"""
return self._changelog
def get_release_notes(self):
"""Получение информации о последнем релизе"""
if self._latest_release:
return {
"version": self._latest_version,
"description": self._latest_release.get("body", ""),
"download_url": self._latest_release.get("assets", [{}])[0].get("browser_download_url", "")
}
return None
def get_releases(self, callback=None):
"""
Получение списка релизов из репозитория.
:param callback: Функция обратного вызова, которая будет вызвана после получения списка релизов
"""
def fetch():
try:
response = requests.get(f"{self.api_url}/releases", timeout=10)
response.raise_for_status()
releases = response.json()
if not releases:
raise UpdateCheckError("Не найдено релизов в репозитории")
self._error = None
if callback:
callback(releases, None)
except requests.RequestException as e:
error_msg = f"Ошибка сетевого подключения: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
except Exception as e:
error_msg = f"Ошибка при получении списка релизов: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
# Запускаем получение в отдельном потоке
threading.Thread(target=fetch, daemon=True).start()