13 Commits

Author SHA1 Message Date
2f4b2985cd Remove documentation and CLI mode features
- Remove "Documentation" menu option
- Delete unused CLI mode code
- Clean up commented-out argument parsing function
2025-02-16 05:00:43 +03:00
d1a870fed7 Add update checking and documentation features
- Implement UpdateChecker for version comparison and update notifications
- Add menu options for documentation and update checking
- Enhance AboutWindow with dynamic version display
- Update requirements.txt with new dependencies
- Create infrastructure for opening local documentation
- Improve application menu with additional help options
2025-02-16 04:50:33 +03:00
2e2dd9e705 Add custom text and entry widgets with enhanced copy/paste functionality
- Implement CustomText and CustomEntry classes with advanced text interaction features
- Add context menu for text widgets with cut, copy, paste, and select all options
- Support multiple keyboard shortcuts for text manipulation
- Replace standard Tkinter Text and Entry widgets with custom implementations
- Remove global text/entry widget bindings in favor of class-specific methods
2025-02-16 03:57:48 +03:00
d937042ea2 Update application title to reflect project name
- Change window title from "Serial Device Manager" to "ComConfigCopy"
2025-02-16 03:53:21 +03:00
136c7877d3 Refactor TFTP file transfer with improved reliability and error handling
- Implement more robust file transfer mechanism with configurable retry and timeout settings
- Add detailed logging for transfer progress and error scenarios
- Enhance block transfer logic with better error recovery
- Simplify transfer socket management and cleanup process
- Improve overall transfer reliability and error tracking
2025-02-16 03:50:27 +03:00
467d582095 Improve file transfer progress tracking and display
- Add dynamic transfer speed calculation
- Compute and display estimated remaining transfer time
- Enhance remaining bytes display with more informative status
- Update transfers table with more detailed transfer progress information
2025-02-16 03:43:34 +03:00
16526b4643 Merge pull request 'TFTP' (#3) from TFTP into main
Reviewed-on: #3
2025-02-16 00:39:43 +00:00
6d2819a860 Add network adapter selection for TFTP server
- Implement `get_network_adapters()` function to dynamically retrieve available network interfaces
- Replace IP address entry with a combobox for network adapter selection
- Add "Update" button to refresh network adapter list
- Improve IP address validation and error handling for TFTP server configuration
- Enhance UI with more user-friendly network interface selection
2025-02-16 03:34:57 +03:00
a252a0f153 Prevent duplicate TFTP server log messages and improve state tracking
- Add filtering mechanism to prevent repeated server start/stop log entries
- Implement state tracking flags to manage server status
- Remove redundant log messages in both ComConfigCopy.py and TFTPServer.py
- Enhance log callback to avoid unnecessary logging of server state changes
2025-02-16 03:31:32 +03:00
3126811f09 Improve TFTP server shutdown and error handling
- Enhance server stop mechanism with more robust socket and thread management
- Add better handling of active transfers during server shutdown
- Implement additional safety checks and timeout handling
- Improve logging and error reporting for server stop process
- Prevent potential deadlocks and resource leaks during server termination
2025-02-16 03:28:53 +03:00
f1ca31c198 Enhance TFTP server implementation with advanced monitoring and UI improvements
- Completely refactor TFTP server implementation with more robust file transfer handling
- Add detailed transfer tracking with active transfers table
- Implement periodic transfer status updates
- Improve log and UI layout for TFTP server tab
- Add more granular error handling and logging
- Enhance threading and socket management for file transfers
2025-02-16 03:19:44 +03:00
c95915483f Update .gitignore to include virtual environment directory
- Add '.venv/' to .gitignore to exclude Python virtual environment files
2025-02-16 03:09:23 +03:00
299ce329f7 Add TFTP server functionality to the application
- Implement TFTP server tab with IP and port configuration
- Create methods to start and stop TFTP server
- Add logging functionality for TFTP server events
- Integrate TFTPServer class into the main application
- Re-enable Firmware directory creation
2025-02-16 02:51:47 +03:00
6 changed files with 1072 additions and 145 deletions

View File

@@ -17,6 +17,7 @@ import re
import sys import sys
import threading import threading
import time import time
import webbrowser
from getpass import getpass from getpass import getpass
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
import tkinter as tk import tkinter as tk
@@ -37,13 +38,20 @@ import serial
import serial.tools.list_ports import serial.tools.list_ports
from serial.serialutil import SerialException from serial.serialutil import SerialException
from about_window import AboutWindow from about_window import AboutWindow
from TFTPServer import TFTPServer
# from TFTPServer import TFTPServerThread # from TFTPServer import TFTPServerThread
import socket
from update_checker import UpdateChecker
# Версия программы
VERSION = "1.0.0"
# Создаем необходимые папки # Создаем необходимые папки
os.makedirs("Logs", exist_ok=True) os.makedirs("Logs", exist_ok=True)
os.makedirs("Configs", exist_ok=True) os.makedirs("Configs", exist_ok=True)
os.makedirs("Settings", exist_ok=True) os.makedirs("Settings", exist_ok=True)
# os.makedirs("Firmware", exist_ok=True) os.makedirs("Firmware", exist_ok=True)
os.makedirs("docs", exist_ok=True)
# Файл настроек находится в папке Settings # Файл настроек находится в папке Settings
SETTINGS_FILE = os.path.join("Settings", "settings.json") SETTINGS_FILE = os.path.join("Settings", "settings.json")
@@ -137,25 +145,37 @@ def list_serial_ports():
# Функции работы с сетевыми адаптерами (не используются) # Функции работы с сетевыми адаптерами (не используются)
# ========================== # ==========================
# def list_network_adapters(): def get_network_adapters():
# """Возвращает список названий сетевых адаптеров (Windows).""" """Получение списка сетевых адаптеров и их IP-адресов."""
# adapters = [] adapters = []
# if platform.system() == "Windows": try:
# try: # Получаем имя хоста
# output = subprocess.check_output( hostname = socket.gethostname()
# 'wmic nic get NetConnectionID', # Получаем все адреса для данного хоста
# shell=True, addresses = socket.getaddrinfo(hostname, None)
# encoding="cp866"
# ) # Создаем множество для хранения уникальных IP-адресов
# for line in output.splitlines(): unique_ips = set()
# line = line.strip()
# if line and line != "NetConnectionID": for addr in addresses:
# adapters.append(line) ip = addr[4][0]
# except Exception as e: # Пропускаем IPv6 и локальные адреса
# logging.error(f"Ошибка при получении списка адаптеров: {e}", exc_info=True) if ':' not in ip and not ip.startswith('127.'):
# else: unique_ips.add(ip)
# adapters = ["eth0"]
# return adapters # Добавляем все найденные IP-адреса в список
for ip in sorted(unique_ips):
adapters.append(f"{ip}")
# Добавляем 0.0.0.0 для прослушивания всех интерфейсов
adapters.insert(0, "0.0.0.0")
except Exception as e:
logging.error(f"Ошибка при получении списка сетевых адаптеров: {e}", exc_info=True)
# В случае ошибки возвращаем хотя бы 0.0.0.0
adapters = ["0.0.0.0"]
return adapters
# ========================== # ==========================
# Функции работы с COM-соединением # Функции работы с COM-соединением
@@ -423,9 +443,109 @@ def execute_commands_from_file(
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True) logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
# ========================== # ==========================
# Графический интерфейс (Tkinter) # Улучшенные текстовые виджеты
# ========================== # ==========================
class CustomText(tk.Text):
"""Улучшенный текстовый виджет с расширенной функциональностью копирования/вставки"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_context_menu()
self.bind_shortcuts()
def create_context_menu(self):
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
def show_context_menu(self, event):
self.context_menu.post(event.x_root, event.y_root)
def bind_shortcuts(self):
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Shift+Insert для вставки
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
# Ctrl+Insert для копирования
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
# Shift+Delete для вырезания
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
def cut(self):
self.event_generate("<<Cut>>")
def copy(self):
self.event_generate("<<Copy>>")
def paste(self):
self.event_generate("<<Paste>>")
def select_all(self, event=None):
self.tag_add(tk.SEL, "1.0", tk.END)
self.mark_set(tk.INSERT, "1.0")
self.see(tk.INSERT)
return "break"
class CustomEntry(ttk.Entry):
"""Улучшенное поле ввода с расширенной функциональностью копирования/вставки"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_context_menu()
self.bind_shortcuts()
def create_context_menu(self):
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
def show_context_menu(self, event):
self.context_menu.post(event.x_root, event.y_root)
def bind_shortcuts(self):
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Shift+Insert для вставки
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
# Ctrl+Insert для копирования
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
# Shift+Delete для вырезания
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
def cut(self):
self.event_generate("<<Cut>>")
def copy(self):
self.event_generate("<<Copy>>")
def paste(self):
self.event_generate("<<Paste>>")
def select_all(self, event=None):
self.select_range(0, tk.END)
return "break"
class SettingsWindow(tk.Toplevel): class SettingsWindow(tk.Toplevel):
def __init__(self, parent, settings, callback=None): def __init__(self, parent, settings, callback=None):
super().__init__(parent) super().__init__(parent)
@@ -472,13 +592,13 @@ class SettingsWindow(tk.Toplevel):
# Размер блока # Размер блока
ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5) ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5)
self.block_size_var = StringVar(value=str(settings.get("block_size", 15))) self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
block_size_entry = ttk.Entry(settings_frame, textvariable=self.block_size_var) block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var)
block_size_entry.grid(row=4, column=1, sticky=W, pady=5) block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
# Приглашение командной строки # Приглашение командной строки
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5) ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
self.prompt_var = StringVar(value=settings.get("prompt", ">")) self.prompt_var = StringVar(value=settings.get("prompt", ">"))
prompt_entry = ttk.Entry(settings_frame, textvariable=self.prompt_var) prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var)
prompt_entry.grid(row=5, column=1, sticky=W, pady=5) prompt_entry.grid(row=5, column=1, sticky=W, pady=5)
# Кнопки # Кнопки
@@ -527,29 +647,94 @@ class SettingsWindow(tk.Toplevel):
class SerialAppGUI(tk.Tk): class SerialAppGUI(tk.Tk):
def __init__(self, settings): def __init__(self, settings):
super().__init__() super().__init__()
self.title("Serial Device Manager") self.title("ComConfigCopy")
self.geometry("900x700") self.geometry("900x700")
# Добавляем VERSION как атрибут класса
self.VERSION = VERSION
# Инициализация проверки обновлений
self.update_checker = UpdateChecker(
VERSION,
"https://gitea.filow.ru/LowaSC/ComConfigCopy"
)
# Настройка стиля
self.style = ttk.Style(self) self.style = ttk.Style(self)
self.style.theme_use("clam") self.style.theme_use("clam")
default_font = ("Segoe UI", 10) default_font = ("Segoe UI", 10)
self.option_add("*Font", default_font) self.option_add("*Font", default_font)
self.settings = settings self.settings = settings
self.connection = None self.connection = None
self.tftp_server = None
# Глобальные биндинги
self.bind_class("Text", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Text", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Text", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.bind_class("Entry", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Entry", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Entry", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.create_menu() self.create_menu()
self.create_tabs() self.create_tabs()
# Проверка первого запуска # Проверка первого запуска
self.check_first_run() self.check_first_run()
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="Проверить обновления", command=self.check_for_updates)
help_menu.add_separator()
help_menu.add_command(label="О программе", command=self.open_about)
def check_for_updates(self):
"""Проверка наличия обновлений"""
def on_update_check(update_available, error):
if error:
messagebox.showerror(
"Ошибка проверки обновлений",
f"Не удалось проверить наличие обновлений:\n{error}"
)
elif update_available:
release_info = self.update_checker.get_release_notes()
if release_info:
response = messagebox.askyesno(
"Доступно обновление",
f"Доступна новая версия {release_info['version']}!\n\n"
f"Изменения:\n{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?",
)
if response:
webbrowser.open(release_info["download_url"])
else:
messagebox.showerror(
"Ошибка",
"Не удалось получить информацию о новой версии"
)
else:
messagebox.showinfo(
"Проверка обновлений",
"У вас установлена последняя версия программы"
)
self.update_checker.check_updates(callback=on_update_check)
def on_settings_changed(self):
"""Обработчик изменения настроек"""
self.settings = settings_load()
def open_settings(self):
"""Открытие окна настроек"""
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
def check_first_run(self): def check_first_run(self):
"""Проверка первого запуска программы""" """Проверка первого запуска программы"""
show_settings = False show_settings = False
@@ -580,31 +765,6 @@ class SerialAppGUI(tk.Tk):
if response: if response:
self.open_settings() self.open_settings()
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="О программе", command=self.open_about)
def open_settings(self):
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
def on_settings_changed(self):
# Обновляем настройки в основном приложении
self.settings = settings_load()
def create_tabs(self): def create_tabs(self):
self.notebook = ttk.Notebook(self) self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5) self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
@@ -613,14 +773,17 @@ class SerialAppGUI(tk.Tk):
interactive_frame = ttk.Frame(self.notebook) interactive_frame = ttk.Frame(self.notebook)
file_exec_frame = ttk.Frame(self.notebook) file_exec_frame = ttk.Frame(self.notebook)
config_editor_frame = ttk.Frame(self.notebook) config_editor_frame = ttk.Frame(self.notebook)
tftp_frame = ttk.Frame(self.notebook)
self.notebook.add(interactive_frame, text="Интерактивный режим") self.notebook.add(interactive_frame, text="Интерактивный режим")
self.notebook.add(file_exec_frame, text="Выполнение файла") self.notebook.add(file_exec_frame, text="Выполнение файла")
self.notebook.add(config_editor_frame, text="Редактор конфигурации") self.notebook.add(config_editor_frame, text="Редактор конфигурации")
self.notebook.add(tftp_frame, text="TFTP Сервер")
self.create_interactive_tab(interactive_frame) self.create_interactive_tab(interactive_frame)
self.create_file_exec_tab(file_exec_frame) self.create_file_exec_tab(file_exec_frame)
self.create_config_editor_tab(config_editor_frame) self.create_config_editor_tab(config_editor_frame)
self.create_tftp_tab(tftp_frame)
# -------------- Вкладка "Интерактивный режим" -------------- # -------------- Вкладка "Интерактивный режим" --------------
def create_interactive_tab(self, frame): def create_interactive_tab(self, frame):
@@ -629,13 +792,13 @@ class SerialAppGUI(tk.Tk):
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5) ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5) ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
self.interactive_text = tk.Text(frame, wrap="word", height=20) self.interactive_text = CustomText(frame, wrap="word", height=20)
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5) self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
input_frame = ttk.Frame(frame) input_frame = ttk.Frame(frame)
input_frame.pack(fill=X, pady=5) input_frame.pack(fill=X, pady=5)
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5) ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
self.command_entry = ttk.Entry(input_frame, width=50) self.command_entry = CustomEntry(input_frame, width=50)
self.command_entry.pack(side=LEFT, padx=5) self.command_entry.pack(side=LEFT, padx=5)
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5) ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
@@ -726,10 +889,10 @@ class SerialAppGUI(tk.Tk):
file_frame.pack(fill=X, pady=5) file_frame.pack(fill=X, pady=5)
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5) ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "") self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5) CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5) ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5) ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
self.file_exec_text = tk.Text(frame, wrap="word", height=15) self.file_exec_text = CustomText(frame, wrap="word", height=15)
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5) self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def select_config_file_fileexec(self): def select_config_file_fileexec(self):
@@ -775,11 +938,11 @@ class SerialAppGUI(tk.Tk):
top_frame.pack(fill=X, pady=5) top_frame.pack(fill=X, pady=5)
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5) ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "") self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5) CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
self.config_editor_text = tk.Text(frame, wrap="word") self.config_editor_text = CustomText(frame, wrap="word")
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5) self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def select_config_file_editor(self): def select_config_file_editor(self):
@@ -823,81 +986,270 @@ class SerialAppGUI(tk.Tk):
about_window.transient(self) about_window.transient(self)
about_window.grab_set() about_window.grab_set()
# ========================== def create_tftp_tab(self, frame):
# Парсер аргументов (не используется) """Создание вкладки TFTP сервера."""
# ========================== # Создаем фрейм для управления TFTP сервером
# def parse_arguments(): tftp_frame = ttk.Frame(frame)
# parser = argparse.ArgumentParser( tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
# description="Программа для работы с устройствами через последовательный порт с графическим интерфейсом."
# )
# parser.add_argument("--cli", action="store_true", help="Запустить в режиме командной строки (без графики)")
# return parser.parse_args()
# ========================== # Создаем и размещаем элементы управления
# Режим командной строки (не используется) controls_frame = ttk.LabelFrame(tftp_frame, text="Управление TFTP сервером")
# ========================== controls_frame.pack(fill=X, padx=5, pady=5)
# def run_cli_mode(settings): # IP адрес
# print("Запущен режим командной строки.") ip_frame = ttk.Frame(controls_frame)
# while True: ip_frame.pack(fill=X, padx=5, pady=2)
# print("\n--- Главное меню ---") ttk.Label(ip_frame, text="IP адрес:").pack(side=LEFT, padx=5)
# print("1. Подключиться и войти в интерактивный режим") self.tftp_ip_var = StringVar(value="0.0.0.0")
# print("2. Выполнить команды из файла конфигурации") self.tftp_ip_combo = ttk.Combobox(ip_frame, textvariable=self.tftp_ip_var, state="readonly")
# print("0. Выход") self.tftp_ip_combo.pack(side=LEFT, fill=X, expand=True, padx=5)
# choice = input("Выберите действие: ").strip() ttk.Button(ip_frame, text="Обновить", command=self.update_network_adapters).pack(side=LEFT, padx=5)
# if choice == "1":
# if not settings.get("port"): # Заполняем список адаптеров
# print("[ERROR] COM-порт не выбран!") self.update_network_adapters()
# continue
# conn = create_connection(settings) # Порт
# if not conn: port_frame = ttk.Frame(controls_frame)
# print("[ERROR] Не удалось установить соединение.") port_frame.pack(fill=X, padx=5, pady=2)
# continue ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
# print("[INFO] Введите команды (для выхода введите _exit):") self.tftp_port_var = StringVar(value="69")
# while True: self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var)
# cmd = input("Команда: ") self.tftp_port_entry.pack(fill=X, expand=True, padx=5)
# if cmd.strip().lower() == "_exit":
# break # Кнопки управления
# try: buttons_frame = ttk.Frame(controls_frame)
# conn.write((cmd + "\n").encode()) buttons_frame.pack(fill=X, padx=5, pady=5)
# response = read_response(
# conn, settings.get("timeout", 10), self.start_tftp_button = ttk.Button(
# login=settings.get("login"), buttons_frame,
# password=settings.get("password"), text="Запустить сервер",
# is_gui=False command=self.start_tftp_server
# ) )
# print(response) self.start_tftp_button.pack(side=LEFT, padx=5)
# except Exception as e:
# print(f"[ERROR] {e}") self.stop_tftp_button = ttk.Button(
# break buttons_frame,
# conn.close() text="Остановить сервер",
# elif choice == "2": command=self.stop_tftp_server,
# if not settings.get("config_file"): state="disabled"
# print("[ERROR] Файл конфигурации не выбран!") )
# continue self.stop_tftp_button.pack(side=LEFT, padx=5)
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!") # Лог сервера
# continue log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера")
# conn = create_connection(settings) log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
# if conn:
# execute_commands_from_file( self.tftp_log_text = CustomText(log_frame, wrap=tk.WORD, height=10)
# conn, self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# settings["config_file"],
# settings.get("timeout", 10), # Добавляем скроллбар для лога
# settings.get("copy_mode", "line"), scrollbar = ttk.Scrollbar(self.tftp_log_text, command=self.tftp_log_text.yview)
# settings.get("block_size", 15), scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# lambda msg: print(msg), self.tftp_log_text.config(yscrollcommand=scrollbar.set)
# login=settings.get("login"),
# password=settings.get("password"), # Статус передач
# is_gui=False, transfers_frame = ttk.LabelFrame(tftp_frame, text="Активные передачи")
# ) transfers_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
# conn.close()
# else: # Создаем таблицу для отображения активных передач
# print("[ERROR] Не удалось установить соединение.") columns = ("client", "filename", "progress", "remaining", "time")
# elif choice == "0": self.transfers_tree = ttk.Treeview(transfers_frame, columns=columns, show="headings")
# break
# else: # Настраиваем заголовки колонок
# print("[ERROR] Некорректный выбор.") self.transfers_tree.heading("client", text="Клиент")
self.transfers_tree.heading("filename", text="Файл")
self.transfers_tree.heading("progress", text="Прогресс")
self.transfers_tree.heading("remaining", text="Осталось")
self.transfers_tree.heading("time", text="Время")
# Настраиваем ширину колонок
self.transfers_tree.column("client", width=120)
self.transfers_tree.column("filename", width=150)
self.transfers_tree.column("progress", width=100)
self.transfers_tree.column("remaining", width=100)
self.transfers_tree.column("time", width=80)
self.transfers_tree.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Инициализация TFTP сервера
self.tftp_server = None
self.tftp_server_thread = None
def start_tftp_server(self):
"""Запуск TFTP сервера."""
try:
# Получаем выбранный IP-адрес
ip = self.tftp_ip_var.get()
if not ip:
messagebox.showerror("Ошибка", "Выберите IP-адрес для TFTP сервера")
return
# Проверяем корректность порта
try:
port = int(self.tftp_port_var.get())
if port <= 0 or port > 65535:
raise ValueError("Порт должен быть в диапазоне 1-65535")
except ValueError as e:
messagebox.showerror("Ошибка", f"Некорректный порт: {str(e)}")
return
# Создаем экземпляр TFTP сервера
self.tftp_server = TFTPServer("Firmware")
# Устанавливаем callback для логирования
def log_callback(message):
# Фильтруем дублирующиеся сообщения о запуске/остановке сервера
if "[INFO] TFTP сервер запущен" in message and hasattr(self, '_server_started'):
return
if "[INFO] TFTP сервер остановлен" in message and hasattr(self, '_server_stopped'):
return
self.append_tftp_log(message)
# Устанавливаем флаги для отслеживания состояния
if "[INFO] TFTP сервер запущен" in message:
self._server_started = True
elif "[INFO] TFTP сервер остановлен" in message:
self._server_stopped = True
# Обновляем информацию о передачах
self.update_transfers_info()
self.tftp_server.set_log_callback(log_callback)
# Запускаем сервер в отдельном потоке
self.tftp_server_thread = threading.Thread(
target=self.run_tftp_server,
args=(ip, port),
daemon=True
)
self.tftp_server_thread.start()
# Обновляем состояние кнопок и элементов управления
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal")
self.tftp_ip_combo.config(state="disabled")
self.tftp_port_entry.config(state="disabled")
# Запускаем периодическое обновление информации о передачах
self.update_transfers_periodically()
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}")
def run_tftp_server(self, ip, port):
"""Запуск TFTP сервера в отдельном потоке."""
try:
self.tftp_server.start_server(ip, port)
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}")
def stop_tftp_server(self):
"""Остановка TFTP сервера."""
if self.tftp_server:
try:
# Отключаем кнопки на время остановки сервера
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="disabled")
# Сбрасываем флаги состояния
if hasattr(self, '_server_started'):
delattr(self, '_server_started')
if hasattr(self, '_server_stopped'):
delattr(self, '_server_stopped')
# Останавливаем сервер
self.tftp_server.stop_server()
# Ждем завершения потока сервера с таймаутом
if self.tftp_server_thread:
self.tftp_server_thread.join(timeout=5.0)
if self.tftp_server_thread.is_alive():
self.append_tftp_log("[WARN] Превышено время ожидания остановки сервера")
# Очищаем ссылки на сервер и поток
self.tftp_server = None
self.tftp_server_thread = None
# Обновляем состояние кнопок
self.start_tftp_button.config(state="normal")
self.stop_tftp_button.config(state="disabled")
self.tftp_ip_combo.config(state="normal")
self.tftp_port_entry.config(state="normal")
# Очищаем таблицу передач
for item in self.transfers_tree.get_children():
self.transfers_tree.delete(item)
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка остановки сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось остановить TFTP сервер: {str(e)}")
# Восстанавливаем состояние кнопок в случае ошибки
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal")
def append_tftp_log(self, message):
"""Добавление сообщения в лог TFTP сервера."""
self.tftp_log_text.insert(END, message + "\n")
self.tftp_log_text.see(END)
def update_transfers_info(self):
"""Обновление информации об активных передачах."""
if not self.tftp_server:
return
# Очищаем текущие записи
for item in self.transfers_tree.get_children():
self.transfers_tree.delete(item)
# Добавляем информацию о текущих передачах
for client_addr, transfer_info in self.tftp_server.active_transfers.items():
filename = transfer_info['filename']
bytes_sent = transfer_info['bytes_sent']
filesize = transfer_info['filesize']
start_time = transfer_info['start_time']
# Вычисляем прогресс
progress = f"{bytes_sent}/{filesize} байт"
remaining_bytes = filesize - bytes_sent
elapsed_time = time.time() - start_time
# Вычисляем скорость передачи (байт/сек)
if elapsed_time > 0:
transfer_speed = bytes_sent / elapsed_time
# Вычисляем оставшееся время
if transfer_speed > 0:
remaining_time = remaining_bytes / transfer_speed
remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)"
else:
remaining_str = f"{remaining_bytes} байт (неизвестно)"
else:
remaining_str = f"{remaining_bytes} байт (вычисляется...)"
# Добавляем запись в таблицу
self.transfers_tree.insert("", END, values=(
f"{client_addr[0]}:{client_addr[1]}",
filename,
progress,
remaining_str,
f"{elapsed_time:.1f}с"
))
def update_transfers_periodically(self):
"""Периодическое обновление информации о передачах."""
if self.tftp_server and self.tftp_server.running:
self.update_transfers_info()
# Планируем следующее обновление через 1 секунду
self.after(1000, self.update_transfers_periodically)
def update_network_adapters(self):
"""Обновление списка сетевых адаптеров."""
adapters = get_network_adapters()
self.tftp_ip_combo["values"] = adapters
if not self.tftp_ip_var.get() in adapters:
self.tftp_ip_var.set(adapters[0])
# ========================== # ==========================
# Основной запуск приложения # Основной запуск приложения

44
README.md Normal file
View File

@@ -0,0 +1,44 @@
# ComConfigCopy
Программа для копирования конфигураций на коммутаторы.
## Описание
ComConfigCopy - это утилита, разработанная для автоматизации процесса копирования конфигураций на сетевые коммутаторы. Программа предоставляет удобный графический интерфейс для управления процессом копирования и настройки параметров подключения.
## Основные возможности
- Копирование конфигураций на коммутаторы через COM-порт
- Поддержка различных скоростей подключения
- Автоматическое определение доступных COM-портов
- Возможность сохранения и загрузки настроек
- Автоматическое обновление через GitHub
## Системные требования
- Windows 7/8/10/11
- Python 3.8 или выше
- Доступ к COM-портам
## Установка
1. Скачайте последнюю версию программы из [репозитория](https://gitea.filow.ru/LowaSC/ComConfigCopy/releases)
2. Распакуйте архив в удобное место
3. Запустите файл `ComConfigCopy.exe`
## Использование
1. Выберите COM-порт из списка доступных
2. Настройте параметры подключения (скорость, биты данных и т.д.)
3. Выберите файл конфигурации для отправки
4. Нажмите кнопку "Отправить" для начала процесса копирования
## Контакты
- Email: LowaWorkMail@gmail.com
- Telegram: [@LowaSC](https://t.me/LowaSC)
- Репозиторий: [ComConfigCopy](https://gitea.filow.ru/LowaSC/ComConfigCopy)
## Лицензия
Этот проект распространяется под лицензией MIT. Подробности смотрите в файле [LICENSE](LICENSE).

349
TFTPServer.py Normal file
View File

@@ -0,0 +1,349 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TFTP сервер для передачи прошивки с компьютера на коммутатор.
- Создает сервер по заданному IP и порту.
- Расшаривает папку Firmware.
- Показывает текущее состояние сервера и статус передачи файла:
- кому (IP устройства),
- сколько осталось байт,
- сколько передано байт,
- время передачи.
"""
import os
import socket
import struct
import threading
import time
class TFTPServer:
def __init__(self, share_folder):
"""
Инициализация TFTP сервера.
:param share_folder: Путь к папке, содержащей файлы (например, папка 'Firmware')
"""
self.share_folder = share_folder
self.log_callback = None
self.running = False
self.server_socket = None
self.lock = threading.Lock()
self.transfer_sockets = set() # Множество для хранения всех активных сокетов передачи
# Словарь активных передач для мониторинга их статуса.
# Ключ адрес клиента, значение словарь с информацией о передаче.
self.active_transfers = {}
def set_log_callback(self, callback):
"""
Установка функции обратного вызова для логирования сообщений.
:param callback: Функция, принимающая строку сообщения.
"""
self.log_callback = callback
def log(self, message):
"""
Функция логирования: вызывает callback (если задан) или выводит сообщение в консоль.
:param message: Строка с сообщением для логирования.
"""
if self.log_callback:
self.log_callback(message)
else:
print(message)
def start_server(self, ip, port):
"""
Запуск TFTP сервера на указанном IP и порту.
:param ip: IP-адрес для привязки сервера.
:param port: Порт для TFTP сервера.
"""
if self.running:
self.log("[WARN] Сервер уже запущен")
return
self.running = True
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.bind((ip, port))
self.log(f"[INFO] TFTP сервер запущен на {ip}:{port}")
while self.running:
try:
self.server_socket.settimeout(1.0)
data, client_addr = self.server_socket.recvfrom(2048)
if data and self.running:
threading.Thread(target=self.handle_request, args=(data, client_addr), daemon=True).start()
except socket.timeout:
continue
except socket.error as e:
if self.running: # Логируем ошибку только если сервер еще запущен
self.log(f"[ERROR] Ошибка получения данных: {str(e)}")
break
except Exception as e:
if self.running: # Логируем ошибку только если сервер еще запущен
self.log(f"[ERROR] Ошибка получения данных: {str(e)}")
break
except Exception as e:
self.log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
finally:
self.running = False
if self.server_socket:
try:
self.server_socket.close()
except:
pass
self.server_socket = None
def stop_server(self):
"""
Остановка TFTP сервера.
"""
if not self.running:
return
self.log("[INFO] Остановка TFTP сервера...")
self.running = False
try:
# Закрываем основной сокет сервера первым
if self.server_socket:
try:
# Создаем временный сокет и отправляем пакет самому себе,
# чтобы разблокировать recvfrom
temp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
server_address = self.server_socket.getsockname()
temp_socket.sendto(b'', server_address)
except:
pass
finally:
try:
temp_socket.close()
except:
pass
try:
self.server_socket.close()
except Exception as e:
self.log(f"[WARN] Ошибка при закрытии основного сокета: {str(e)}")
finally:
self.server_socket = None
# Закрываем все активные сокеты передачи
with self.lock:
active_sockets = list(self.transfer_sockets)
self.transfer_sockets.clear()
active_transfers = dict(self.active_transfers)
self.active_transfers.clear()
# Закрываем сокеты передачи после очистки множества
for sock in active_sockets:
try:
if sock:
sock.close()
except Exception as e:
self.log(f"[WARN] Ошибка при закрытии сокета передачи: {str(e)}")
# Отправляем сообщения об остановке для активных передач
for client_addr, transfer_info in active_transfers.items():
try:
self.send_error(client_addr, 0, "Сервер остановлен")
except:
pass
except Exception as e:
self.log(f"[ERROR] Ошибка при остановке сервера: {str(e)}")
finally:
self.running = False # Гарантируем, что флаг running будет False
self.log("[INFO] TFTP сервер остановлен")
def handle_request(self, data, client_addr):
"""
Обработка входящего запроса от клиента.
:param data: Полученные данные (UDP-пакет).
:param client_addr: Адрес клиента, отправившего пакет.
"""
if len(data) < 2:
self.log(f"[WARN] Получен некорректный пакет от {client_addr}")
return
opcode = struct.unpack("!H", data[:2])[0]
if opcode == 1: # RRQ (Read Request) запрос на чтение файла
self.handle_rrq(data, client_addr)
else:
self.log(f"[WARN] Неподдерживаемый запрос (опкод {opcode}) от {client_addr}")
def handle_rrq(self, data, client_addr):
"""
Обработка запроса на чтение файла (RRQ).
:param data: Данные запроса.
:param client_addr: Адрес клиента.
"""
try:
# RRQ формата: 2 байта опкода, затем строка имени файла, за которой следует 0,
# затем строка режима (например, "octet"), и завершается 0.
parts = data[2:].split(b'\0')
if len(parts) < 2:
self.log(f"[WARN] Некорректный RRQ пакет от {client_addr}")
return
filename = parts[0].decode('utf-8')
mode = parts[1].decode('utf-8').lower()
self.log(f"[INFO] Получен RRQ от {client_addr}: файл '{filename}', режим '{mode}'")
if mode != "octet":
self.send_error(client_addr, 0, "Поддерживается только octet режим")
return
file_path = os.path.join(self.share_folder, filename)
if not os.path.isfile(file_path):
self.send_error(client_addr, 1, "Файл не найден")
return
# Запускаем передачу файла в новом потоке.
threading.Thread(target=self.send_file, args=(file_path, client_addr), daemon=True).start()
except Exception as e:
self.log(f"[ERROR] Ошибка обработки RRQ: {str(e)}")
def send_error(self, client_addr, error_code, error_message):
"""
Отправка сообщения об ошибке клиенту.
:param client_addr: Адрес клиента.
:param error_code: Код ошибки.
:param error_message: Текст ошибки.
"""
# Формируем TFTP пакет ошибки: 2 байта опкода (5), 2 байта кода ошибки, сообщение об ошибке и завершающий 0.
packet = struct.pack("!HH", 5, error_code) + error_message.encode('utf-8') + b'\0'
self.server_socket.sendto(packet, client_addr)
self.log(f"[INFO] Отправлено сообщение об ошибке '{error_message}' клиенту {client_addr}")
def send_file(self, file_path, client_addr):
"""
Передача файла клиенту по протоколу TFTP.
"""
BLOCK_SIZE = 512
MAX_RETRIES = 5
TIMEOUT = 2.0
transfer_socket = None
try:
if not os.path.exists(file_path):
self.log(f"[ERROR] Файл '{file_path}' не существует")
self.send_error(client_addr, 1, "Файл не найден")
return
filesize = os.path.getsize(file_path)
if filesize == 0:
self.log(f"[ERROR] Файл '{file_path}' пуст")
self.send_error(client_addr, 0, "Файл пуст")
return
start_time = time.time()
file_basename = os.path.basename(file_path)
# Регистрируем активную передачу
with self.lock:
self.active_transfers[client_addr] = {
'filename': file_basename,
'filesize': filesize,
'bytes_sent': 0,
'start_time': start_time
}
# Создаем новый сокет для передачи данных
transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
transfer_socket.settimeout(TIMEOUT)
with self.lock:
self.transfer_sockets.add(transfer_socket)
self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.")
with open(file_path, 'rb') as file:
block_number = 1
last_successful_block = 0
while True:
# Читаем блок данных
data = file.read(BLOCK_SIZE)
# Формируем и отправляем пакет данных
packet = struct.pack('!HH', 3, block_number) + data
retries = 0
while retries < MAX_RETRIES:
try:
transfer_socket.sendto(packet, client_addr)
# Ожидаем подтверждение
while True:
try:
ack_data, ack_addr = transfer_socket.recvfrom(4)
if ack_addr == client_addr and len(ack_data) >= 4:
opcode, ack_block = struct.unpack('!HH', ack_data)
if opcode == 4: # ACK
if ack_block == block_number:
# Успешное подтверждение
last_successful_block = block_number
bytes_sent = min((block_number * BLOCK_SIZE), filesize)
# Обновляем информацию о прогрессе
with self.lock:
if client_addr in self.active_transfers:
self.active_transfers[client_addr]['bytes_sent'] = bytes_sent
# Логируем статус каждую секунду
current_time = time.time()
if current_time - start_time >= 1.0:
bytes_remaining = filesize - bytes_sent
elapsed_time = current_time - start_time
self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | "
f"Отправлено: {bytes_sent}/{filesize} байт | "
f"Осталось: {bytes_remaining} байт | "
f"Время: {elapsed_time:.2f} сек.")
break
elif ack_block < block_number:
# Получен старый ACK, игнорируем
continue
except socket.timeout:
break
if last_successful_block == block_number:
break
else:
retries += 1
self.log(f"[WARN] Таймаут ожидания ACK для блока {block_number} от {client_addr}. "
f"Попытка {retries + 1}.")
except Exception as e:
retries += 1
self.log(f"[ERROR] Ошибка при передаче блока {block_number}: {str(e)}")
if retries >= MAX_RETRIES:
self.log(f"[ERROR] Превышено максимальное количество попыток для блока {block_number}")
return
block_number += 1
# Если отправили меньше BLOCK_SIZE байт, это был последний блок
if len(data) < BLOCK_SIZE:
break
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} завершена успешно")
except Exception as e:
self.log(f"[ERROR] Ошибка при передаче файла: {str(e)}")
finally:
# Очищаем информацию о передаче
with self.lock:
if client_addr in self.active_transfers:
del self.active_transfers[client_addr]
if transfer_socket in self.transfer_sockets:
self.transfer_sockets.remove(transfer_socket)
if transfer_socket:
try:
transfer_socket.close()
except:
pass

View File

@@ -2,19 +2,22 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import tkinter as tk import tkinter as tk
from tkinter import ttk, BOTH, X, BOTTOM from tkinter import ttk, BOTH, X, BOTTOM, END
import webbrowser import webbrowser
class AboutWindow(tk.Toplevel): class AboutWindow(tk.Toplevel):
def __init__(self, parent): def __init__(self, parent):
super().__init__(parent) super().__init__(parent)
self.title("О программе") self.title("О программе")
self.geometry("400x300") self.geometry("600x500")
self.resizable(False, False) self.resizable(False, False)
# Создаем фрейм # Сохраняем ссылку на родительское окно
self.parent = parent
# Создаем фрейм для содержимого
about_frame = ttk.Frame(self, padding="20") about_frame = ttk.Frame(self, padding="20")
about_frame.pack(fill=BOTH, expand=True) about_frame.pack(fill=BOTH, expand=True, padx=10, pady=10)
# Заголовок # Заголовок
ttk.Label( ttk.Label(
@@ -33,7 +36,7 @@ class AboutWindow(tk.Toplevel):
# Версия # Версия
ttk.Label( ttk.Label(
about_frame, about_frame,
text="Версия 1.0", text=f"Версия {getattr(parent, 'VERSION', '1.0.0')}",
font=("Segoe UI", 10) font=("Segoe UI", 10)
).pack(pady=(0, 20)) ).pack(pady=(0, 20))
@@ -80,14 +83,14 @@ class AboutWindow(tk.Toplevel):
# Кнопка закрытия # Кнопка закрытия
ttk.Button( ttk.Button(
about_frame, self,
text="Закрыть", text="Закрыть",
command=self.destroy command=self.destroy
).pack(side=BOTTOM, pady=(20, 0)) ).pack(side=BOTTOM, pady=10)
# Центрируем окно # Центрируем окно
self.center_window() self.center_window()
def center_window(self): def center_window(self):
self.update_idletasks() self.update_idletasks()
width = self.winfo_width() width = self.winfo_width()
@@ -95,6 +98,6 @@ class AboutWindow(tk.Toplevel):
x = (self.winfo_screenwidth() // 2) - (width // 2) x = (self.winfo_screenwidth() // 2) - (width // 2)
y = (self.winfo_screenheight() // 2) - (height // 2) y = (self.winfo_screenheight() // 2) - (height // 2)
self.geometry(f"{width}x{height}+{x}+{y}") self.geometry(f"{width}x{height}+{x}+{y}")
def open_url(self, url): def open_url(self, url):
webbrowser.open(url) webbrowser.open(url)

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
tftpy>=0.8.0
pyserial>=3.5
requests>=2.31.0
packaging>=23.2

175
update_checker.py Normal file
View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import requests
import threading
from packaging import version
class UpdateCheckError(Exception):
"""Исключение для ошибок проверки обновлений"""
pass
class UpdateChecker:
"""Класс для проверки обновлений программы"""
def __init__(self, current_version, repo_url):
self.current_version = current_version
self.repo_url = repo_url
# Формируем базовый URL API
self.api_url = repo_url.replace("gitea.filow.ru", "gitea.filow.ru/api/v1/repos/LowaSC/ComConfigCopy")
self._update_available = False
self._latest_version = None
self._latest_release = None
self._error = None
self._changelog = None
def get_changelog(self, callback=None):
"""
Получение changelog из репозитория.
:param callback: Функция обратного вызова, которая будет вызвана после получения changelog
"""
def fetch():
try:
# Пытаемся получить CHANGELOG.md из репозитория
response = requests.get(f"{self.api_url}/contents/CHANGELOG.md", timeout=10)
response.raise_for_status()
content = response.json()
if "content" in content:
import base64
changelog_content = base64.b64decode(content["content"]).decode("utf-8")
self._changelog = changelog_content
self._error = None
else:
raise UpdateCheckError("Не удалось получить содержимое CHANGELOG.md")
except requests.RequestException as e:
error_msg = f"Ошибка получения changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
except Exception as e:
error_msg = f"Неизвестная ошибка при получении changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
finally:
if callback:
callback(self._changelog, self._error)
# Запускаем получение в отдельном потоке
threading.Thread(target=fetch, daemon=True).start()
def check_updates(self, callback=None):
"""
Проверка наличия обновлений.
:param callback: Функция обратного вызова, которая будет вызвана после проверки
"""
def check():
try:
response = requests.get(f"{self.api_url}/releases", timeout=10)
response.raise_for_status()
releases = response.json()
if not releases:
raise UpdateCheckError("Не найдено релизов в репозитории")
latest_release = releases[0]
latest_version = latest_release.get("tag_name", "").lstrip("v")
if not latest_version:
raise UpdateCheckError("Не удалось определить версию последнего релиза")
try:
if version.parse(latest_version) > version.parse(self.current_version):
self._update_available = True
self._latest_version = latest_version
self._latest_release = latest_release
logging.info(f"Доступно обновление: {latest_version}")
else:
logging.info("Обновления не требуются")
except version.InvalidVersion as e:
raise UpdateCheckError(f"Некорректный формат версии: {e}")
self._error = None
except requests.RequestException as e:
error_msg = f"Ошибка сетевого подключения: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
except UpdateCheckError as e:
logging.error(str(e), exc_info=True)
self._error = str(e)
except Exception as e:
error_msg = f"Неизвестная ошибка при проверке обновлений: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
finally:
if callback:
callback(self._update_available, self._error)
@property
def update_available(self):
"""Доступно ли обновление"""
return self._update_available
@property
def latest_version(self):
"""Последняя доступная версия"""
return self._latest_version
@property
def error(self):
"""Последняя ошибка при проверке обновлений"""
return self._error
@property
def changelog(self):
"""Текущий changelog"""
return self._changelog
def get_release_notes(self):
"""Получение информации о последнем релизе"""
if self._latest_release:
return {
"version": self._latest_version,
"description": self._latest_release.get("body", ""),
"download_url": self._latest_release.get("assets", [{}])[0].get("browser_download_url", "")
}
return None
def get_releases(self, callback=None):
"""
Получение списка релизов из репозитория.
:param callback: Функция обратного вызова, которая будет вызвана после получения списка релизов
"""
def fetch():
try:
response = requests.get(f"{self.api_url}/releases", timeout=10)
response.raise_for_status()
releases = response.json()
if not releases:
raise UpdateCheckError("Не найдено релизов в репозитории")
self._error = None
if callback:
callback(releases, None)
except requests.RequestException as e:
error_msg = f"Ошибка сетевого подключения: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
except Exception as e:
error_msg = f"Ошибка при получении списка релизов: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
# Запускаем получение в отдельном потоке
threading.Thread(target=fetch, daemon=True).start()