15 Commits

Author SHA1 Message Date
f5935d6b8f Merge pull request 'v1.0.1' (#4) from v1.0.1 into main
Reviewed-on: #4
2025-02-16 22:02:20 +00:00
1a511ff54f Enhance update checking
- Refactor UpdateChecker
- Add support for parsing release details with improved formatting
- Implement more robust version comparison and release type handling
- Add logging for update checking process
- Improve error handling and release information extraction
- Update update checking logic to handle stable and pre-release versions
2025-02-17 00:53:24 +03:00
f84e20631b Update contact email in README.md
- Change contact email from LowaWorkMail@gmail.com to SPRF555@gmail.com
2025-02-17 00:11:10 +03:00
4a67e70a92 Обновить README.md 2025-02-16 20:52:02 +00:00
12562e615f Bump version to 1.0.1
- Update application version number in ComConfigCopy.py
- Minor version increment to reflect recent changes
2025-02-16 23:48:59 +03:00
7ebeb52808 Refactor and optimize code structure with base widget and utility functions
- Create CustomWidgetBase for shared context menu and shortcut functionality
- Add utility functions for common tasks like text appending and file selection
- Simplify command processing with a generic send_command_and_process_response function
- Remove redundant comments and unused imports
- Improve code organization and readability
- Enhance modularity of custom widgets and utility methods
2025-02-16 23:45:19 +03:00
a140b7d8a0 Clean up .gitignore file
- Remove specific config and test files from tracking
- Add Configs/ directory to ignore list
- Remove unnecessary icon and test script entries
2025-02-16 05:36:53 +03:00
2c9edcd859 Remove outdated ComConfigCopy executable binary 2025-02-16 05:35:35 +03:00
5a00efd175 Update executable binary after feature removal 2025-02-16 05:08:44 +03:00
2f4b2985cd Remove documentation and CLI mode features
- Remove "Documentation" menu option
- Delete unused CLI mode code
- Clean up commented-out argument parsing function
2025-02-16 05:00:43 +03:00
d1a870fed7 Add update checking and documentation features
- Implement UpdateChecker for version comparison and update notifications
- Add menu options for documentation and update checking
- Enhance AboutWindow with dynamic version display
- Update requirements.txt with new dependencies
- Create infrastructure for opening local documentation
- Improve application menu with additional help options
2025-02-16 04:50:33 +03:00
2e2dd9e705 Add custom text and entry widgets with enhanced copy/paste functionality
- Implement CustomText and CustomEntry classes with advanced text interaction features
- Add context menu for text widgets with cut, copy, paste, and select all options
- Support multiple keyboard shortcuts for text manipulation
- Replace standard Tkinter Text and Entry widgets with custom implementations
- Remove global text/entry widget bindings in favor of class-specific methods
2025-02-16 03:57:48 +03:00
d937042ea2 Update application title to reflect project name
- Change window title from "Serial Device Manager" to "ComConfigCopy"
2025-02-16 03:53:21 +03:00
136c7877d3 Refactor TFTP file transfer with improved reliability and error handling
- Implement more robust file transfer mechanism with configurable retry and timeout settings
- Add detailed logging for transfer progress and error scenarios
- Enhance block transfer logic with better error recovery
- Simplify transfer socket management and cleanup process
- Improve overall transfer reliability and error tracking
2025-02-16 03:50:27 +03:00
467d582095 Improve file transfer progress tracking and display
- Add dynamic transfer speed calculation
- Compute and display estimated remaining transfer time
- Enhance remaining bytes display with more informative status
- Update transfers table with more detailed transfer progress information
2025-02-16 03:43:34 +03:00
9 changed files with 634 additions and 351 deletions

5
.gitignore vendored
View File

@@ -1,9 +1,6 @@
app.log app.log
Settings/settings.json Settings/settings.json
Configs/Eltex MES2424 AC - Сеть FTTB 2G, доп.txt Configs/
Configs/конфиг доп 3750-52 с айпи 172.17.141.133 .txt
DALL·E 2024-12-29 01.01.02 - Square vector logo_ A clean and minimalistic app icon for serial port management software. The design prominently features a simplified rectangular CO.ico
test.py
__pycache__/ __pycache__/
Firmware/1.jpg Firmware/1.jpg
Firmware/2 Firmware/2

View File

@@ -5,11 +5,6 @@
# Это программа для копирования конфигураций на коммутаторы # Это программа для копирования конфигураций на коммутаторы
# ------------------------------------------------------------ # ------------------------------------------------------------
# import argparse Использовался для получения аргументов из командной строки (не используется)
# import platform Использовался для получения списка сетевых адаптеров (не используется)
# import subprocess Использовался для получения списка сетевых адаптеров (не используется)
# import socket не используется
import json import json
import logging import logging
import os import os
@@ -17,6 +12,7 @@ import re
import sys import sys
import threading import threading
import time import time
import webbrowser
from getpass import getpass from getpass import getpass
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
import tkinter as tk import tkinter as tk
@@ -38,24 +34,24 @@ import serial.tools.list_ports
from serial.serialutil import SerialException from serial.serialutil import SerialException
from about_window import AboutWindow from about_window import AboutWindow
from TFTPServer import TFTPServer from TFTPServer import TFTPServer
# from TFTPServer import TFTPServerThread
import socket import socket
from update_checker import UpdateChecker
# Версия программы
VERSION = "1.0.1"
# Создаем необходимые папки # Создаем необходимые папки
os.makedirs("Logs", exist_ok=True) os.makedirs("Logs", exist_ok=True)
os.makedirs("Configs", exist_ok=True) os.makedirs("Configs", exist_ok=True)
os.makedirs("Settings", exist_ok=True) os.makedirs("Settings", exist_ok=True)
os.makedirs("Firmware", exist_ok=True) os.makedirs("Firmware", exist_ok=True)
os.makedirs("docs", exist_ok=True)
# Файл настроек находится в папке Settings # Файл настроек находится в папке Settings
SETTINGS_FILE = os.path.join("Settings", "settings.json") SETTINGS_FILE = os.path.join("Settings", "settings.json")
# ========================== # Настройка логирования с использованием RotatingFileHandler.
# Функции работы с настройками и логированием
# ==========================
def setup_logging(): def setup_logging():
"""Настройка логирования с использованием RotatingFileHandler."""
logger = logging.getLogger() logger = logging.getLogger()
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
log_path = os.path.join("Logs", "app.log") log_path = os.path.join("Logs", "app.log")
@@ -66,8 +62,8 @@ def setup_logging():
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
# Загрузка настроек из JSON-файла или создание настроек по умолчанию.
def settings_load(): def settings_load():
"""Загрузка настроек из JSON-файла или создание настроек по умолчанию."""
default_settings = { default_settings = {
"port": None, # Порт для подключения "port": None, # Порт для подключения
"baudrate": 9600, # Скорость передачи данных "baudrate": 9600, # Скорость передачи данных
@@ -120,8 +116,8 @@ def settings_load():
logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True) logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True)
return default_settings return default_settings
# Сохранение настроек в JSON-файл
def settings_save(settings): def settings_save(settings):
"""Сохранение настроек в JSON-файл."""
try: try:
with open(SETTINGS_FILE, "w", encoding="utf-8") as f: with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(settings, f, indent=4, ensure_ascii=False) json.dump(settings, f, indent=4, ensure_ascii=False)
@@ -129,18 +125,15 @@ def settings_save(settings):
except Exception as e: except Exception as e:
logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True) logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True)
# Получение списка доступных последовательных портов
def list_serial_ports(): def list_serial_ports():
"""Получение списка доступных последовательных портов.""" """Получение списка доступных последовательных портов."""
ports = serial.tools.list_ports.comports() ports = serial.tools.list_ports.comports()
logging.debug(f"Найдено {len(ports)} серийных портов.") logging.debug(f"Найдено {len(ports)} серийных портов.")
return [port.device for port in ports] return [port.device for port in ports]
# ========================== # Получение списка IP-адресов из сетевых адаптеров
# Функции работы с сетевыми адаптерами (не используются)
# ==========================
def get_network_adapters(): def get_network_adapters():
"""Получение списка сетевых адаптеров и их IP-адресов."""
adapters = [] adapters = []
try: try:
# Получаем имя хоста # Получаем имя хоста
@@ -171,12 +164,8 @@ def get_network_adapters():
return adapters return adapters
# ========================== # Создание соединения с устройством через последовательный порт
# Функции работы с COM-соединением
# ==========================
def create_connection(settings): def create_connection(settings):
"""Создание соединения с устройством через последовательный порт."""
try: try:
conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1) conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1)
logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.") logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.")
@@ -188,12 +177,8 @@ def create_connection(settings):
logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True) logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True)
return None return None
# Проверка наличия логина и пароля в настройках и отправка их на устройство # Проверка наличия логина и пароля в настройках и отправка их на устройство
def send_login_password(serial_connection, login=None, password=None, is_gui=False): def send_login_password(serial_connection, login=None, password=None, is_gui=False):
"""Отправка логина и пароля на устройство."""
if not login: if not login:
if is_gui: if is_gui:
login = simpledialog.askstring("Login", "Введите логин:") login = simpledialog.askstring("Login", "Введите логин:")
@@ -210,14 +195,8 @@ def send_login_password(serial_connection, login=None, password=None, is_gui=Fal
else: else:
password = getpass("Введите пароль: ") password = getpass("Введите пароль: ")
# Чтение ответа от устройства с учётом таймаута. # Чтение ответа от устройства с учётом таймаута.
def read_response(serial_connection, timeout, login=None, password=None, is_gui=False): def read_response(serial_connection, timeout, login=None, password=None, is_gui=False):
"""
Чтение ответа от устройства с учётом таймаута.
Если в последней строке появляется запрос логина или пароля, данные отправляются автоматически.
"""
response = b"" response = b""
end_time = time.time() + timeout end_time = time.time() + timeout
decoded = "" decoded = ""
@@ -254,8 +233,8 @@ def read_response(serial_connection, timeout, login=None, password=None, is_gui=
time.sleep(0.1) time.sleep(0.1)
return decoded return decoded
# Генерация блоков команд для блочного копирования
def generate_command_blocks(lines, block_size): def generate_command_blocks(lines, block_size):
"""Генерация блоков команд для блочного копирования."""
blocks = [] blocks = []
current_block = [] current_block = []
for line in lines: for line in lines:
@@ -281,6 +260,7 @@ def generate_command_blocks(lines, block_size):
blocks.append("\n".join(current_block)) blocks.append("\n".join(current_block))
return blocks return blocks
# Выполнение команд из файла конфигурации
def execute_commands_from_file( def execute_commands_from_file(
serial_connection, serial_connection,
filename, filename,
@@ -292,12 +272,6 @@ def execute_commands_from_file(
password=None, password=None,
is_gui=False, is_gui=False,
): ):
"""
Выполнение команд из файла конфигурации.
Если передан log_callback, вывод будет отображаться в GUI.
Теперь при обнаружении ошибки (например, если в ответе присутствует символ '^')
команда будет отправляться повторно.
"""
try: try:
with open(filename, "r", encoding="utf-8") as file: with open(filename, "r", encoding="utf-8") as file:
lines = [line for line in file if line.strip()] lines = [line for line in file if line.strip()]
@@ -436,10 +410,81 @@ def execute_commands_from_file(
log_callback(msg) log_callback(msg)
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True) logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
# ========================== # Базовый класс для кастомных виджетов с поддержкой контекстного меню и горячих клавиш
# Графический интерфейс (Tkinter) class CustomWidgetBase:
# ==========================
def __init__(self):
self.create_context_menu()
self.bind_shortcuts()
# Создание контекстного меню
def create_context_menu(self):
"""Создание контекстного меню"""
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
# Привязка горячих клавиш
def bind_shortcuts(self):
"""Привязка горячих клавиш"""
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Дополнительные сочетания
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
# Отображение контекстного меню
def show_context_menu(self, event):
"""Отображение контекстного меню"""
self.context_menu.post(event.x_root, event.y_root)
# Вырезание текста
def cut(self):
self.event_generate("<<Cut>>")
# Копирование текста
def copy(self):
self.event_generate("<<Copy>>")
# Вставка текста
def paste(self):
self.event_generate("<<Paste>>")
# Класс для текстового поля с поддержкой контекстного меню и горячих клавиш
class CustomText(tk.Text, CustomWidgetBase):
def __init__(self, *args, **kwargs):
tk.Text.__init__(self, *args, **kwargs)
CustomWidgetBase.__init__(self)
def select_all(self, event=None):
"""Выделение всего текста в Text виджете"""
self.tag_add(tk.SEL, "1.0", tk.END)
self.mark_set(tk.INSERT, "1.0")
self.see(tk.INSERT)
return "break"
# Класс для поля ввода с поддержкой контекстного меню и горячих клавиш
class CustomEntry(ttk.Entry, CustomWidgetBase):
def __init__(self, *args, **kwargs):
ttk.Entry.__init__(self, *args, **kwargs)
CustomWidgetBase.__init__(self)
def select_all(self, event=None):
"""Выделение всего текста в Entry виджете"""
self.select_range(0, tk.END)
return "break"
# Класс для окна настроек
class SettingsWindow(tk.Toplevel): class SettingsWindow(tk.Toplevel):
def __init__(self, parent, settings, callback=None): def __init__(self, parent, settings, callback=None):
super().__init__(parent) super().__init__(parent)
@@ -479,20 +524,21 @@ class SettingsWindow(tk.Toplevel):
copy_mode_frame = ttk.Frame(settings_frame) copy_mode_frame = ttk.Frame(settings_frame)
copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5) copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5)
ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line", ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line",
variable=self.copy_mode_var).pack(side=LEFT) variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block", ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block",
variable=self.copy_mode_var).pack(side=LEFT) variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
# Размер блока # Размер блока
ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5) self.block_size_label = ttk.Label(settings_frame, text="Размер блока:")
self.block_size_label.grid(row=4, column=0, sticky=W, pady=5)
self.block_size_var = StringVar(value=str(settings.get("block_size", 15))) self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
block_size_entry = ttk.Entry(settings_frame, textvariable=self.block_size_var) self.block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var)
block_size_entry.grid(row=4, column=1, sticky=W, pady=5) self.block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
# Приглашение командной строки # Приглашение командной строки
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5) ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
self.prompt_var = StringVar(value=settings.get("prompt", ">")) self.prompt_var = StringVar(value=settings.get("prompt", ">"))
prompt_entry = ttk.Entry(settings_frame, textvariable=self.prompt_var) prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var)
prompt_entry.grid(row=5, column=1, sticky=W, pady=5) prompt_entry.grid(row=5, column=1, sticky=W, pady=5)
# Кнопки # Кнопки
@@ -503,9 +549,22 @@ class SettingsWindow(tk.Toplevel):
self.update_ports() self.update_ports()
# Инициализация видимости поля размера блока
self.toggle_block_size()
# Центрируем окно # Центрируем окно
self.center_window() self.center_window()
# Переключение видимости поля размера блока
def toggle_block_size(self):
if self.copy_mode_var.get() == "line":
self.block_size_label.grid_remove()
self.block_size_entry.grid_remove()
else:
self.block_size_label.grid()
self.block_size_entry.grid()
# Центрирование окна
def center_window(self): def center_window(self):
self.update_idletasks() self.update_idletasks()
width = self.winfo_width() width = self.winfo_width()
@@ -514,12 +573,14 @@ class SettingsWindow(tk.Toplevel):
y = (self.winfo_screenheight() // 2) - (height // 2) y = (self.winfo_screenheight() // 2) - (height // 2)
self.geometry(f"{width}x{height}+{x}+{y}") self.geometry(f"{width}x{height}+{x}+{y}")
# Обновление списка доступных последовательных портов
def update_ports(self): def update_ports(self):
ports = list_serial_ports() ports = list_serial_ports()
self.port_combo["values"] = ports self.port_combo["values"] = ports
if ports and not self.port_var.get(): if ports and not self.port_var.get():
self.port_var.set(ports[0]) self.port_var.set(ports[0])
# Сохранение настроек
def save_settings(self): def save_settings(self):
try: try:
self.settings.update({ self.settings.update({
@@ -538,35 +599,179 @@ class SettingsWindow(tk.Toplevel):
except ValueError as e: except ValueError as e:
messagebox.showerror("Ошибка", "Проверьте правильность введенных значений") messagebox.showerror("Ошибка", "Проверьте правильность введенных значений")
# Общая функция для добавления текста в текстовое поле
def append_text_to_widget(widget, text):
# Проверяем, заканчивается ли текст символом новой строки
if not text.endswith('\n'):
text += '\n'
widget.insert(END, text)
widget.see(END)
# Общая функция для выбора файла конфигурации
def select_config_file(self, var, save_to_settings=False):
filename = filedialog.askopenfilename(
title="Выберите файл конфигурации",
filetypes=[("Text files", "*.txt")]
)
if filename:
var.set(filename)
if save_to_settings:
self.settings["config_file"] = filename
settings_save(self.settings)
# Общая функция для отправки команды и обработки ответа
def send_command_and_process_response(
serial_connection,
cmd,
timeout,
max_attempts=3,
log_callback=None,
login=None,
password=None,
is_gui=False
):
attempt = 0
while attempt < max_attempts:
msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
if log_callback:
log_callback(msg)
serial_connection.write((cmd + "\n").encode())
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
if response:
if '^' in response:
msg = (
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{response}\n"
f"Повторная отправка команды...\n"
)
if log_callback:
log_callback(msg)
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
attempt += 1
time.sleep(1)
continue
else:
msg = f"Ответ устройства:\n{response}\n"
if log_callback:
log_callback(msg)
logging.info(f"Ответ устройства:\n{response}")
return True, response
else:
msg = f"Ответ не получен для команды: {cmd}\n"
if log_callback:
log_callback(msg)
logging.warning(f"Нет ответа для команды: {cmd}")
return False, None
msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
if log_callback:
log_callback(msg)
logging.error(msg)
return False, None
# Основной класс для графического интерфейса
class SerialAppGUI(tk.Tk): class SerialAppGUI(tk.Tk):
def __init__(self, settings): def __init__(self, settings):
super().__init__() super().__init__()
self.title("Serial Device Manager") self.title("ComConfigCopy")
self.geometry("900x700") self.geometry("900x700")
# Добавляем VERSION как атрибут класса
self.VERSION = VERSION
# Инициализация проверки обновлений
self.update_checker = UpdateChecker(
VERSION,
"https://gitea.filow.ru/LowaSC/ComConfigCopy",
include_prereleases=False # Проверяем только стабильные версии
)
# Настройка стиля
self.style = ttk.Style(self) self.style = ttk.Style(self)
self.style.theme_use("clam") self.style.theme_use("clam")
default_font = ("Segoe UI", 10) default_font = ("Segoe UI", 10)
self.option_add("*Font", default_font) self.option_add("*Font", default_font)
self.settings = settings self.settings = settings
self.connection = None self.connection = None
self.tftp_server = None self.tftp_server = None
# Глобальные биндинги
self.bind_class("Text", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Text", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Text", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.bind_class("Entry", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Entry", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Entry", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.create_menu() self.create_menu()
self.create_tabs() self.create_tabs()
# Проверка первого запуска # Проверка первого запуска
self.check_first_run() self.check_first_run()
# Создание меню
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="Проверить обновления", command=self.check_for_updates)
help_menu.add_separator()
help_menu.add_command(label="О программе", command=self.open_about)
# Проверка наличия обновлений
def check_for_updates(self):
def on_update_check(update_available, error):
if error:
messagebox.showerror(
"Ошибка проверки обновлений",
f"Не удалось проверить наличие обновлений:\n{error}"
)
elif update_available:
release_info = self.update_checker.get_release_notes()
if release_info:
# Форматируем сообщение
message = (
f"Доступна новая версия {release_info['version']}!\n\n"
f"Тип релиза: {'Пре-релиз' if release_info['type'] == 'prerelease' else 'Стабильный'}\n\n"
"Изменения:\n"
f"{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?"
)
response = messagebox.askyesno(
"Доступно обновление",
message,
)
if response:
webbrowser.open(release_info["link"])
else:
messagebox.showerror(
"Ошибка",
"Не удалось получить информацию о новой версии"
)
else:
messagebox.showinfo(
"Проверка обновлений",
"У вас установлена последняя версия программы"
)
self.update_checker.check_updates(callback=on_update_check)
# Обработчик изменения настроек
def on_settings_changed(self):
self.settings = settings_load()
# Открытие окна настроек
def open_settings(self):
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
# Проверка первого запуска программы
def check_first_run(self): def check_first_run(self):
"""Проверка первого запуска программы"""
show_settings = False show_settings = False
# Проверяем существование файла настроек # Проверяем существование файла настроек
@@ -595,31 +800,7 @@ class SerialAppGUI(tk.Tk):
if response: if response:
self.open_settings() self.open_settings()
def create_menu(self): # Создание вкладок
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="О программе", command=self.open_about)
def open_settings(self):
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
def on_settings_changed(self):
# Обновляем настройки в основном приложении
self.settings = settings_load()
def create_tabs(self): def create_tabs(self):
self.notebook = ttk.Notebook(self) self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5) self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
@@ -640,23 +821,24 @@ class SerialAppGUI(tk.Tk):
self.create_config_editor_tab(config_editor_frame) self.create_config_editor_tab(config_editor_frame)
self.create_tftp_tab(tftp_frame) self.create_tftp_tab(tftp_frame)
# -------------- Вкладка "Интерактивный режим" -------------- # Создание вкладки "Интерактивный режим"
def create_interactive_tab(self, frame): def create_interactive_tab(self, frame):
control_frame = ttk.Frame(frame) control_frame = ttk.Frame(frame)
control_frame.pack(fill=X, pady=5) control_frame.pack(fill=X, pady=5)
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5) ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5) ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
self.interactive_text = tk.Text(frame, wrap="word", height=20) self.interactive_text = CustomText(frame, wrap="word", height=20)
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5) self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
input_frame = ttk.Frame(frame) input_frame = ttk.Frame(frame)
input_frame.pack(fill=X, pady=5) input_frame.pack(fill=X, pady=5)
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5) ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
self.command_entry = ttk.Entry(input_frame, width=50) self.command_entry = CustomEntry(input_frame, width=50)
self.command_entry.pack(side=LEFT, padx=5) self.command_entry.pack(side=LEFT, padx=5)
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5) ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
# Подключение к устройству
def connect_device(self): def connect_device(self):
if self.connection: if self.connection:
messagebox.showinfo("Информация", "Уже подключено.") messagebox.showinfo("Информация", "Уже подключено.")
@@ -670,6 +852,7 @@ class SerialAppGUI(tk.Tk):
else: else:
self.append_interactive_text("[ERROR] Не удалось установить соединение.\n") self.append_interactive_text("[ERROR] Не удалось установить соединение.\n")
# Отключение от устройства
def disconnect_device(self): def disconnect_device(self):
if self.connection: if self.connection:
try: try:
@@ -681,6 +864,7 @@ class SerialAppGUI(tk.Tk):
else: else:
messagebox.showinfo("Информация", "Соединение не установлено.") messagebox.showinfo("Информация", "Соединение не установлено.")
# Отправка команды
def send_command(self): def send_command(self):
if not self.connection: if not self.connection:
messagebox.showerror("Ошибка", "Сначала установите соединение!") messagebox.showerror("Ошибка", "Сначала установите соединение!")
@@ -691,42 +875,19 @@ class SerialAppGUI(tk.Tk):
self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n") self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n")
threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start() threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start()
# Обработка команды
def process_command(self, cmd): def process_command(self, cmd):
try: try:
max_attempts = 3 success, response = send_command_and_process_response(
attempt = 0 self.connection,
while attempt < max_attempts: cmd,
self.connection.write((cmd + "\n").encode()) self.settings.get("timeout", 10),
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") max_attempts=3,
response = read_response( log_callback=self.append_interactive_text,
self.connection, login=self.settings.get("login"),
self.settings.get("timeout", 10), password=self.settings.get("password"),
login=self.settings.get("login"), is_gui=True
password=self.settings.get("password"), )
is_gui=True,
)
if response:
if '^' in response:
self.append_interactive_text(
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{response}\n"
f"Повторная отправка команды...\n"
)
logging.warning(f"Ошибка в команде: {cmd}. Попытка повторной отправки.")
attempt += 1
time.sleep(1)
continue
else:
self.append_interactive_text(f"[INFO] Ответ устройства:\n{response}\n")
logging.info(f"Получен ответ:\n{response}")
break
else:
self.append_interactive_text("[WARN] Ответ не получен.\n")
logging.warning("Нет ответа от устройства в течение таймаута.")
break
if attempt == max_attempts:
self.append_interactive_text(f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n")
logging.error(f"Команда не выполнена корректно после {max_attempts} попыток: {cmd}")
except SerialException as e: except SerialException as e:
self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n") self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n")
logging.error(f"Ошибка отправки команды: {e}", exc_info=True) logging.error(f"Ошибка отправки команды: {e}", exc_info=True)
@@ -734,27 +895,27 @@ class SerialAppGUI(tk.Tk):
self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n") self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n")
logging.error(f"Неизвестная ошибка: {e}", exc_info=True) logging.error(f"Неизвестная ошибка: {e}", exc_info=True)
# Добавление текста в текстовое поле
def append_interactive_text(self, text): def append_interactive_text(self, text):
self.interactive_text.insert(END, text) append_text_to_widget(self.interactive_text, text)
self.interactive_text.see(END)
# -------------- Вкладка "Выполнить команды из файла" -------------- # Создание вкладки "Выполнить команды из файла"
def create_file_exec_tab(self, frame): def create_file_exec_tab(self, frame):
file_frame = ttk.Frame(frame) file_frame = ttk.Frame(frame)
file_frame.pack(fill=X, pady=5) file_frame.pack(fill=X, pady=5)
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5) ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "") self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5) CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5) ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5) ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
self.file_exec_text = tk.Text(frame, wrap="word", height=15) self.file_exec_text = CustomText(frame, wrap="word", height=15)
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5) self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Выбор файла конфигурации для выполнения команд
def select_config_file_fileexec(self): def select_config_file_fileexec(self):
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")]) select_config_file(self, self.file_exec_var)
if filename:
self.file_exec_var.set(filename)
# Выполнение команд из файла
def execute_file_commands(self): def execute_file_commands(self):
if not self.settings.get("port"): if not self.settings.get("port"):
messagebox.showerror("Ошибка", "COM-порт не выбран!") messagebox.showerror("Ошибка", "COM-порт не выбран!")
@@ -784,29 +945,26 @@ class SerialAppGUI(tk.Tk):
).start() ).start()
def append_file_exec_text(self, text): def append_file_exec_text(self, text):
self.file_exec_text.insert(END, text) append_text_to_widget(self.file_exec_text, text)
self.file_exec_text.see(END)
# -------------- Вкладка "Редактор конфигурационного файла" -------------- # Создание вкладки "Редактор конфигурационного файла"
def create_config_editor_tab(self, frame): def create_config_editor_tab(self, frame):
top_frame = ttk.Frame(frame) top_frame = ttk.Frame(frame)
top_frame.pack(fill=X, pady=5) top_frame.pack(fill=X, pady=5)
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5) ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "") self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5) CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
self.config_editor_text = tk.Text(frame, wrap="word") self.config_editor_text = CustomText(frame, wrap="word")
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5) self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Выбор файла конфигурации для редактирования
def select_config_file_editor(self): def select_config_file_editor(self):
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")]) select_config_file(self, self.editor_file_var, save_to_settings=True)
if filename:
self.editor_file_var.set(filename)
self.settings["config_file"] = filename
settings_save(self.settings)
# Загрузка файла конфигурации
def load_config_file(self): def load_config_file(self):
filename = self.editor_file_var.get() filename = self.editor_file_var.get()
if not filename or not os.path.exists(filename): if not filename or not os.path.exists(filename):
@@ -822,6 +980,7 @@ class SerialAppGUI(tk.Tk):
logging.error(f"Ошибка загрузки файла: {e}", exc_info=True) logging.error(f"Ошибка загрузки файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}") messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}")
# Сохранение файла конфигурации
def save_config_file(self): def save_config_file(self):
filename = self.editor_file_var.get() filename = self.editor_file_var.get()
if not filename: if not filename:
@@ -836,13 +995,14 @@ class SerialAppGUI(tk.Tk):
logging.error(f"Ошибка сохранения файла: {e}", exc_info=True) logging.error(f"Ошибка сохранения файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}") messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}")
# Открытие окна "О программе"
def open_about(self): def open_about(self):
about_window = AboutWindow(self) about_window = AboutWindow(self)
about_window.transient(self) about_window.transient(self)
about_window.grab_set() about_window.grab_set()
# Создание вкладки TFTP сервера
def create_tftp_tab(self, frame): def create_tftp_tab(self, frame):
"""Создание вкладки TFTP сервера."""
# Создаем фрейм для управления TFTP сервером # Создаем фрейм для управления TFTP сервером
tftp_frame = ttk.Frame(frame) tftp_frame = ttk.Frame(frame)
tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
@@ -868,7 +1028,7 @@ class SerialAppGUI(tk.Tk):
port_frame.pack(fill=X, padx=5, pady=2) port_frame.pack(fill=X, padx=5, pady=2)
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5) ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
self.tftp_port_var = StringVar(value="69") self.tftp_port_var = StringVar(value="69")
self.tftp_port_entry = ttk.Entry(port_frame, textvariable=self.tftp_port_var) self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var)
self.tftp_port_entry.pack(fill=X, expand=True, padx=5) self.tftp_port_entry.pack(fill=X, expand=True, padx=5)
# Кнопки управления # Кнопки управления
@@ -894,7 +1054,7 @@ class SerialAppGUI(tk.Tk):
log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера") log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера")
log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
self.tftp_log_text = tk.Text(log_frame, wrap=tk.WORD, height=10) self.tftp_log_text = CustomText(log_frame, wrap=tk.WORD, height=10)
self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5) self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Добавляем скроллбар для лога # Добавляем скроллбар для лога
@@ -930,8 +1090,8 @@ class SerialAppGUI(tk.Tk):
self.tftp_server = None self.tftp_server = None
self.tftp_server_thread = None self.tftp_server_thread = None
# Запуск TFTP сервера
def start_tftp_server(self): def start_tftp_server(self):
"""Запуск TFTP сервера."""
try: try:
# Получаем выбранный IP-адрес # Получаем выбранный IP-адрес
ip = self.tftp_ip_var.get() ip = self.tftp_ip_var.get()
@@ -993,15 +1153,15 @@ class SerialAppGUI(tk.Tk):
self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}") self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}") messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}")
# Запуск TFTP сервера в отдельном потоке
def run_tftp_server(self, ip, port): def run_tftp_server(self, ip, port):
"""Запуск TFTP сервера в отдельном потоке."""
try: try:
self.tftp_server.start_server(ip, port) self.tftp_server.start_server(ip, port)
except Exception as e: except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}") self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}")
# Остановка TFTP сервера
def stop_tftp_server(self): def stop_tftp_server(self):
"""Остановка TFTP сервера."""
if self.tftp_server: if self.tftp_server:
try: try:
# Отключаем кнопки на время остановки сервера # Отключаем кнопки на время остановки сервера
@@ -1045,13 +1205,11 @@ class SerialAppGUI(tk.Tk):
self.start_tftp_button.config(state="disabled") self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal") self.stop_tftp_button.config(state="normal")
def append_tftp_log(self, message): def append_tftp_log(self, text):
"""Добавление сообщения в лог TFTP сервера.""" append_text_to_widget(self.tftp_log_text, text)
self.tftp_log_text.insert(END, message + "\n")
self.tftp_log_text.see(END)
# Обновление информации об активных передачах
def update_transfers_info(self): def update_transfers_info(self):
"""Обновление информации об активных передачах."""
if not self.tftp_server: if not self.tftp_server:
return return
@@ -1068,108 +1226,44 @@ class SerialAppGUI(tk.Tk):
# Вычисляем прогресс # Вычисляем прогресс
progress = f"{bytes_sent}/{filesize} байт" progress = f"{bytes_sent}/{filesize} байт"
remaining = filesize - bytes_sent remaining_bytes = filesize - bytes_sent
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
# Вычисляем скорость передачи (байт/сек)
if elapsed_time > 0:
transfer_speed = bytes_sent / elapsed_time
# Вычисляем оставшееся время
if transfer_speed > 0:
remaining_time = remaining_bytes / transfer_speed
remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)"
else:
remaining_str = f"{remaining_bytes} байт (неизвестно)"
else:
remaining_str = f"{remaining_bytes} байт (вычисляется...)"
# Добавляем запись в таблицу # Добавляем запись в таблицу
self.transfers_tree.insert("", END, values=( self.transfers_tree.insert("", END, values=(
f"{client_addr[0]}:{client_addr[1]}", f"{client_addr[0]}:{client_addr[1]}",
filename, filename,
progress, progress,
f"{remaining} байт", remaining_str,
f"{elapsed_time:.1f}с" f"{elapsed_time:.1f}с"
)) ))
# Периодическое обновление информации о передачах
def update_transfers_periodically(self): def update_transfers_periodically(self):
"""Периодическое обновление информации о передачах."""
if self.tftp_server and self.tftp_server.running: if self.tftp_server and self.tftp_server.running:
self.update_transfers_info() self.update_transfers_info()
# Планируем следующее обновление через 1 секунду # Планируем следующее обновление через 1 секунду
self.after(1000, self.update_transfers_periodically) self.after(1000, self.update_transfers_periodically)
# Обновление списка сетевых адаптеров
def update_network_adapters(self): def update_network_adapters(self):
"""Обновление списка сетевых адаптеров."""
adapters = get_network_adapters() adapters = get_network_adapters()
self.tftp_ip_combo["values"] = adapters self.tftp_ip_combo["values"] = adapters
if not self.tftp_ip_var.get() in adapters: if not self.tftp_ip_var.get() in adapters:
self.tftp_ip_var.set(adapters[0]) self.tftp_ip_var.set(adapters[0])
# ==========================
# Парсер аргументов (не используется)
# ==========================
# def parse_arguments():
# parser = argparse.ArgumentParser(
# description="Программа для работы с устройствами через последовательный порт с графическим интерфейсом."
# )
# parser.add_argument("--cli", action="store_true", help="Запустить в режиме командной строки (без графики)")
# return parser.parse_args()
# ==========================
# Режим командной строки (не используется)
# ==========================
# def run_cli_mode(settings):
# print("Запущен режим командной строки.")
# while True:
# print("\n--- Главное меню ---")
# print("1. Подключиться и войти в интерактивный режим")
# print("2. Выполнить команды из файла конфигурации")
# print("0. Выход")
# choice = input("Выберите действие: ").strip()
# if choice == "1":
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if not conn:
# print("[ERROR] Не удалось установить соединение.")
# continue
# print("[INFO] Введите команды (для выхода введите _exit):")
# while True:
# cmd = input("Команда: ")
# if cmd.strip().lower() == "_exit":
# break
# try:
# conn.write((cmd + "\n").encode())
# response = read_response(
# conn, settings.get("timeout", 10),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False
# )
# print(response)
# except Exception as e:
# print(f"[ERROR] {e}")
# break
# conn.close()
# elif choice == "2":
# if not settings.get("config_file"):
# print("[ERROR] Файл конфигурации не выбран!")
# continue
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if conn:
# execute_commands_from_file(
# conn,
# settings["config_file"],
# settings.get("timeout", 10),
# settings.get("copy_mode", "line"),
# settings.get("block_size", 15),
# lambda msg: print(msg),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False,
# )
# conn.close()
# else:
# print("[ERROR] Не удалось установить соединение.")
# elif choice == "0":
# break
# else:
# print("[ERROR] Некорректный выбор.")
# ========================== # ==========================
# Основной запуск приложения # Основной запуск приложения
# ========================== # ==========================

BIN
Icon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

44
README.md Normal file
View File

@@ -0,0 +1,44 @@
# ComConfigCopy
Программа для копирования конфигураций на коммутаторы.
## Описание
ComConfigCopy - это утилита, разработанная для автоматизации процесса копирования конфигураций на сетевые коммутаторы. Программа предоставляет удобный графический интерфейс для управления процессом копирования и настройки параметров подключения.
## Основные возможности
- Копирование конфигураций на коммутаторы через COM-порт
- Поддержка различных скоростей подключения
- Автоматическое определение доступных COM-портов
- Возможность сохранения и загрузки настроек
- Автоматическое обновление через GitHub
## Системные требования
- Windows 7/8/10/11
- Python 3.8 или выше
- Доступ к COM-портам
## Установка
1. Скачайте последнюю версию программы из [репозитория](https://gitea.filow.ru/LowaSC/ComConfigCopy/releases)
2. Распакуйте архив в удобное место
3. Запустите файл `ComConfigCopy.exe`
## Использование
1. Выберите COM-порт из списка доступных
2. Настройте параметры подключения (скорость, биты данных и т.д.)
3. Выберите файл конфигурации для отправки
4. Нажмите кнопку "Отправить" для начала процесса копирования
## Контакты
- Email: SPRF555@gmail.com
- Telegram: [@LowaSC](https://t.me/LowaSC)
- Репозиторий: [ComConfigCopy](https://gitea.filow.ru/LowaSC/ComConfigCopy)
## Лицензия
Этот проект распространяется под лицензией MIT. Подробности смотрите в файле [LICENSE](LICENSE).

View File

@@ -224,6 +224,8 @@ class TFTPServer:
Передача файла клиенту по протоколу TFTP. Передача файла клиенту по протоколу TFTP.
""" """
BLOCK_SIZE = 512 BLOCK_SIZE = 512
MAX_RETRIES = 5
TIMEOUT = 2.0
transfer_socket = None transfer_socket = None
try: try:
if not os.path.exists(file_path): if not os.path.exists(file_path):
@@ -249,124 +251,99 @@ class TFTPServer:
'start_time': start_time 'start_time': start_time
} }
self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.") # Создаем новый сокет для передачи данных
# Создаем отдельный сокет для передачи файла
transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
transfer_socket.settimeout(5.0) # Таймаут ожидания ACK transfer_socket.settimeout(TIMEOUT)
# Добавляем сокет в множество активных сокетов
with self.lock: with self.lock:
self.transfer_sockets.add(transfer_socket) self.transfer_sockets.add(transfer_socket)
block_num = 1 self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.")
bytes_sent = 0
last_progress_time = time.time()
try: with open(file_path, 'rb') as file:
with open(file_path, "rb") as f: block_number = 1
while self.running: # Проверяем флаг running last_successful_block = 0
while True:
# Читаем блок данных
data = file.read(BLOCK_SIZE)
# Формируем и отправляем пакет данных
packet = struct.pack('!HH', 3, block_number) + data
retries = 0
while retries < MAX_RETRIES:
try: try:
data_block = f.read(BLOCK_SIZE) transfer_socket.sendto(packet, client_addr)
if not data_block: # Достигнут конец файла
break # Ожидаем подтверждение
while True:
# Проверяем флаг running перед отправкой блока
if not self.running:
raise Exception("Передача прервана: сервер остановлен")
# Формируем TFTP пакет данных
packet = struct.pack("!HH", 3, block_num) + data_block
attempts = 0
ack_received = False
# Попытка отправки текущего блока (до 3 повторных попыток)
while attempts < 3 and not ack_received and self.running:
if transfer_socket is None:
raise Exception("Сокет передачи закрыт")
try: try:
transfer_socket.sendto(packet, client_addr) ack_data, ack_addr = transfer_socket.recvfrom(4)
if ack_addr == client_addr and len(ack_data) >= 4:
# Логируем прогресс каждую секунду opcode, ack_block = struct.unpack('!HH', ack_data)
current_time = time.time() if opcode == 4: # ACK
if current_time - last_progress_time >= 1.0: if ack_block == block_number:
elapsed_time = current_time - start_time # Успешное подтверждение
remaining = filesize - bytes_sent last_successful_block = block_number
self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | " bytes_sent = min((block_number * BLOCK_SIZE), filesize)
f"Отправлено: {bytes_sent}/{filesize} байт | "
f"Осталось: {remaining} байт | " # Обновляем информацию о прогрессе
f"Время: {elapsed_time:.2f} сек.") with self.lock:
last_progress_time = current_time if client_addr in self.active_transfers:
self.active_transfers[client_addr]['bytes_sent'] = bytes_sent
# Ожидаем подтверждение
ack_data, addr = transfer_socket.recvfrom(4) # Логируем статус каждую секунду
if addr == client_addr: current_time = time.time()
ack_opcode, ack_block = struct.unpack("!HH", ack_data) if current_time - start_time >= 1.0:
if ack_opcode == 4 and ack_block == block_num: bytes_remaining = filesize - bytes_sent
ack_received = True elapsed_time = current_time - start_time
bytes_sent += len(data_block) self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | "
with self.lock: f"Отправлено: {bytes_sent}/{filesize} байт | "
if client_addr in self.active_transfers: f"Осталось: {bytes_remaining} байт | "
self.active_transfers[client_addr]['bytes_sent'] = bytes_sent f"Время: {elapsed_time:.2f} сек.")
else:
self.log(f"[WARN] Неверный ACK от {client_addr}. " break
f"Ожидался блок {block_num}, получен {ack_block}.") elif ack_block < block_number:
# Получен старый ACK, игнорируем
continue
except socket.timeout: except socket.timeout:
attempts += 1 break
self.log(f"[WARN] Таймаут ожидания ACK для блока {block_num} "
f"от {client_addr}. Попытка {attempts+1}.") if last_successful_block == block_number:
except socket.error as e: break
if not self.running: else:
raise Exception("Передача прервана: сервер остановлен") retries += 1
self.log(f"[ERROR] Ошибка сокета при отправке блока {block_num}: {str(e)}") self.log(f"[WARN] Таймаут ожидания ACK для блока {block_number} от {client_addr}. "
attempts += 1 f"Попытка {retries + 1}.")
except Exception as e:
if not self.running:
raise Exception("Передача прервана: сервер остановлен")
self.log(f"[ERROR] Ошибка при отправке блока {block_num}: {str(e)}")
attempts += 1
if not ack_received:
raise Exception(f"Не удалось получить подтверждение для блока {block_num}")
block_num = (block_num + 1) % 65536
except Exception as e: except Exception as e:
if not self.running: retries += 1
raise Exception("Передача прервана: сервер остановлен") self.log(f"[ERROR] Ошибка при передаче блока {block_number}: {str(e)}")
raise
if retries >= MAX_RETRIES:
self.log(f"[ERROR] Превышено максимальное количество попыток для блока {block_number}")
return
block_number += 1
# Если отправили меньше BLOCK_SIZE байт, это был последний блок
if len(data) < BLOCK_SIZE:
break
if bytes_sent == filesize: self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} завершена успешно")
elapsed_time = time.time() - start_time
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} "
f"завершена за {elapsed_time:.2f} сек. Всего отправлено {bytes_sent} байт.")
except Exception as e:
if not self.running:
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} прервана: сервер остановлен")
raise
except Exception as e: except Exception as e:
if not self.running: self.log(f"[ERROR] Ошибка при передаче файла: {str(e)}")
return # Не логируем повторно о прерывании передачи
self.log(f"[ERROR] Ошибка при передаче файла '{os.path.basename(file_path)}' "
f"клиенту {client_addr}: {str(e)}")
try:
self.send_error(client_addr, 0, str(e))
except:
pass
finally: finally:
# Закрываем сокет передачи # Очищаем информацию о передаче
if transfer_socket:
try:
with self.lock:
self.transfer_sockets.discard(transfer_socket)
transfer_socket.close()
transfer_socket = None
except:
pass
# Удаляем информацию о передаче
with self.lock: with self.lock:
if client_addr in self.active_transfers: if client_addr in self.active_transfers:
del self.active_transfers[client_addr] del self.active_transfers[client_addr]
if transfer_socket in self.transfer_sockets:
self.transfer_sockets.remove(transfer_socket)
if transfer_socket:
try:
transfer_socket.close()
except:
pass

View File

@@ -2,19 +2,22 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import tkinter as tk import tkinter as tk
from tkinter import ttk, BOTH, X, BOTTOM from tkinter import ttk, BOTH, X, BOTTOM, END
import webbrowser import webbrowser
class AboutWindow(tk.Toplevel): class AboutWindow(tk.Toplevel):
def __init__(self, parent): def __init__(self, parent):
super().__init__(parent) super().__init__(parent)
self.title("О программе") self.title("О программе")
self.geometry("400x300") self.geometry("600x500")
self.resizable(False, False) self.resizable(False, False)
# Создаем фрейм # Сохраняем ссылку на родительское окно
self.parent = parent
# Создаем фрейм для содержимого
about_frame = ttk.Frame(self, padding="20") about_frame = ttk.Frame(self, padding="20")
about_frame.pack(fill=BOTH, expand=True) about_frame.pack(fill=BOTH, expand=True, padx=10, pady=10)
# Заголовок # Заголовок
ttk.Label( ttk.Label(
@@ -33,7 +36,7 @@ class AboutWindow(tk.Toplevel):
# Версия # Версия
ttk.Label( ttk.Label(
about_frame, about_frame,
text="Версия 1.0", text=f"Версия {getattr(parent, 'VERSION', '1.0.0')}",
font=("Segoe UI", 10) font=("Segoe UI", 10)
).pack(pady=(0, 20)) ).pack(pady=(0, 20))
@@ -66,7 +69,7 @@ class AboutWindow(tk.Toplevel):
ttk.Label( ttk.Label(
contact_frame, contact_frame,
text="Email: LowaWorkMail@gmail.com" text="Email: SPRF555@gmail.com"
).pack(anchor="w") ).pack(anchor="w")
telegram_link = ttk.Label( telegram_link = ttk.Label(
@@ -80,14 +83,14 @@ class AboutWindow(tk.Toplevel):
# Кнопка закрытия # Кнопка закрытия
ttk.Button( ttk.Button(
about_frame, self,
text="Закрыть", text="Закрыть",
command=self.destroy command=self.destroy
).pack(side=BOTTOM, pady=(20, 0)) ).pack(side=BOTTOM, pady=10)
# Центрируем окно # Центрируем окно
self.center_window() self.center_window()
def center_window(self): def center_window(self):
self.update_idletasks() self.update_idletasks()
width = self.winfo_width() width = self.winfo_width()
@@ -95,6 +98,6 @@ class AboutWindow(tk.Toplevel):
x = (self.winfo_screenwidth() // 2) - (width // 2) x = (self.winfo_screenwidth() // 2) - (width // 2)
y = (self.winfo_screenheight() // 2) - (height // 2) y = (self.winfo_screenheight() // 2) - (height // 2)
self.geometry(f"{width}x{height}+{x}+{y}") self.geometry(f"{width}x{height}+{x}+{y}")
def open_url(self, url): def open_url(self, url):
webbrowser.open(url) webbrowser.open(url)

Binary file not shown.

View File

@@ -1 +1,4 @@
tftpy>=0.8.0 tftpy>=0.8.0
pyserial>=3.5
requests>=2.31.0
packaging>=23.2

165
update_checker.py Normal file
View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import requests
import threading
import re
from packaging import version
import xml.etree.ElementTree as ET
import html
class UpdateCheckError(Exception):
"""Исключение для ошибок проверки обновлений"""
pass
class ReleaseType:
"""Типы релизов"""
STABLE = "stable"
PRERELEASE = "prerelease"
class UpdateChecker:
"""Класс для проверки обновлений программы"""
def __init__(self, current_version, repo_url, include_prereleases=False):
self.current_version = current_version
self.repo_url = repo_url
self.include_prereleases = include_prereleases
self.rss_url = f"{repo_url}/releases.rss"
self.release_info = None
def _clean_html(self, html_text):
"""Очищает HTML-разметку и форматирует текст"""
if not html_text:
return ""
text = re.sub(r'<[^>]+>', '', html_text)
text = html.unescape(text)
text = re.sub(r'\n\s*\n', '\n\n', text)
return '\n'.join(line.strip() for line in text.splitlines()).strip()
def _parse_release_info(self, item):
"""Извлекает информацию о релизе из RSS item"""
title = item.find('title').text if item.find('title') is not None else ''
link = item.find('link').text if item.find('link') is not None else ''
description = item.find('description').text if item.find('description') is not None else ''
content = item.find('{http://purl.org/rss/1.0/modules/content/}encoded')
content_text = content.text if content is not None else ''
# Извлекаем версию и проверяем тип релиза из тега
version_match = re.search(r'/releases/tag/(?:pre-)?v?(\d+\.\d+(?:\.\d+)?)', link)
if not version_match:
return None
version_str = version_match.group(1)
# Проверяем наличие префикса pre- в теге
is_prerelease = 'pre-' in link.lower()
# Форматируем название релиза
formatted_title = title
if title == version_str or not title.strip():
# Если заголовок пустой или совпадает с версией, создаем стандартное название
release_type = "Пре-релиз" if is_prerelease else "Версия"
formatted_title = f"{release_type} {version_str}"
elif not re.search(version_str, title):
# Если версия не указана в заголовке, добавляем её
formatted_title = f"{title} ({version_str})"
# Форматируем описание
formatted_description = self._clean_html(content_text or description)
if not formatted_description.strip():
formatted_description = "Нет описания"
# Добавляем метку типа релиза в начало описания
release_type_label = "[Пре-релиз] " if is_prerelease else ""
formatted_description = f"{release_type_label}{formatted_description}"
return {
'title': formatted_title,
'link': link,
'description': formatted_description,
'version': version_str,
'type': ReleaseType.PRERELEASE if is_prerelease else ReleaseType.STABLE
}
def check_updates(self, callback=None):
"""Проверяет наличие обновлений в асинхронном режиме"""
def check_worker():
try:
logging.info(f"Текущая версия программы: {self.current_version}")
logging.info(f"Проверка пре-релизов: {self.include_prereleases}")
logging.info(f"Запрос RSS ленты: {self.rss_url}")
response = requests.get(self.rss_url, timeout=10)
response.raise_for_status()
root = ET.fromstring(response.content)
items = root.findall('.//item')
if not items:
raise UpdateCheckError("Релизы не найдены")
logging.info(f"Найдено {len(items)} релизов")
latest_version = None
latest_info = None
for item in items:
release_info = self._parse_release_info(item)
if not release_info:
continue
is_prerelease = release_info['type'] == ReleaseType.PRERELEASE
logging.info(
f"Проверка релиза: {release_info['title']}, "
f"версия: {release_info['version']}, "
f"тип: {'пре-релиз' if is_prerelease else 'стабильная'}"
)
# Пропускаем пре-релизы если они не включены
if is_prerelease and not self.include_prereleases:
logging.info(f"Пропуск пре-релиза: {release_info['version']}")
continue
# Сравниваем версии
try:
current_ver = version.parse(latest_version or "0.0.0")
new_ver = version.parse(release_info['version'].split('-')[0]) # Убираем суффикс для сравнения
if new_ver > current_ver:
latest_version = release_info['version']
latest_info = release_info
logging.info(f"Новая версия: {latest_version}")
except version.InvalidVersion as e:
logging.warning(f"Некорректный формат версии {release_info['version']}: {e}")
continue
if not latest_info:
raise UpdateCheckError("Не найдены подходящие версии")
self.release_info = latest_info
# Сравниваем с текущей версией
current_ver = version.parse(self.current_version)
latest_ver = version.parse(latest_version.split('-')[0])
update_available = latest_ver > current_ver
logging.info(f"Сравнение версий: текущая {current_ver} <-> последняя {latest_ver}")
logging.info(f"Доступно обновление: {update_available}")
if callback:
callback(update_available, None)
except UpdateCheckError as e:
logging.error(str(e))
if callback:
callback(False, str(e))
except Exception as e:
logging.error(f"Ошибка при проверке обновлений: {e}", exc_info=True)
if callback:
callback(False, str(e))
threading.Thread(target=check_worker, daemon=True).start()
def get_release_notes(self):
"""Возвращает информацию о последнем релизе"""
return self.release_info