6 Commits

Author SHA1 Message Date
2f4b2985cd Remove documentation and CLI mode features
- Remove "Documentation" menu option
- Delete unused CLI mode code
- Clean up commented-out argument parsing function
2025-02-16 05:00:43 +03:00
d1a870fed7 Add update checking and documentation features
- Implement UpdateChecker for version comparison and update notifications
- Add menu options for documentation and update checking
- Enhance AboutWindow with dynamic version display
- Update requirements.txt with new dependencies
- Create infrastructure for opening local documentation
- Improve application menu with additional help options
2025-02-16 04:50:33 +03:00
2e2dd9e705 Add custom text and entry widgets with enhanced copy/paste functionality
- Implement CustomText and CustomEntry classes with advanced text interaction features
- Add context menu for text widgets with cut, copy, paste, and select all options
- Support multiple keyboard shortcuts for text manipulation
- Replace standard Tkinter Text and Entry widgets with custom implementations
- Remove global text/entry widget bindings in favor of class-specific methods
2025-02-16 03:57:48 +03:00
d937042ea2 Update application title to reflect project name
- Change window title from "Serial Device Manager" to "ComConfigCopy"
2025-02-16 03:53:21 +03:00
136c7877d3 Refactor TFTP file transfer with improved reliability and error handling
- Implement more robust file transfer mechanism with configurable retry and timeout settings
- Add detailed logging for transfer progress and error scenarios
- Enhance block transfer logic with better error recovery
- Simplify transfer socket management and cleanup process
- Improve overall transfer reliability and error tracking
2025-02-16 03:50:27 +03:00
467d582095 Improve file transfer progress tracking and display
- Add dynamic transfer speed calculation
- Compute and display estimated remaining transfer time
- Enhance remaining bytes display with more informative status
- Update transfers table with more detailed transfer progress information
2025-02-16 03:43:34 +03:00
6 changed files with 521 additions and 238 deletions

View File

@@ -17,6 +17,7 @@ import re
import sys
import threading
import time
import webbrowser
from getpass import getpass
from logging.handlers import RotatingFileHandler
import tkinter as tk
@@ -40,12 +41,17 @@ from about_window import AboutWindow
from TFTPServer import TFTPServer
# from TFTPServer import TFTPServerThread
import socket
from update_checker import UpdateChecker
# Версия программы
VERSION = "1.0.0"
# Создаем необходимые папки
os.makedirs("Logs", exist_ok=True)
os.makedirs("Configs", exist_ok=True)
os.makedirs("Settings", exist_ok=True)
os.makedirs("Firmware", exist_ok=True)
os.makedirs("docs", exist_ok=True)
# Файл настроек находится в папке Settings
SETTINGS_FILE = os.path.join("Settings", "settings.json")
@@ -437,9 +443,109 @@ def execute_commands_from_file(
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
# ==========================
# Графический интерфейс (Tkinter)
# Улучшенные текстовые виджеты
# ==========================
class CustomText(tk.Text):
"""Улучшенный текстовый виджет с расширенной функциональностью копирования/вставки"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_context_menu()
self.bind_shortcuts()
def create_context_menu(self):
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
def show_context_menu(self, event):
self.context_menu.post(event.x_root, event.y_root)
def bind_shortcuts(self):
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Shift+Insert для вставки
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
# Ctrl+Insert для копирования
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
# Shift+Delete для вырезания
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
def cut(self):
self.event_generate("<<Cut>>")
def copy(self):
self.event_generate("<<Copy>>")
def paste(self):
self.event_generate("<<Paste>>")
def select_all(self, event=None):
self.tag_add(tk.SEL, "1.0", tk.END)
self.mark_set(tk.INSERT, "1.0")
self.see(tk.INSERT)
return "break"
class CustomEntry(ttk.Entry):
"""Улучшенное поле ввода с расширенной функциональностью копирования/вставки"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_context_menu()
self.bind_shortcuts()
def create_context_menu(self):
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
def show_context_menu(self, event):
self.context_menu.post(event.x_root, event.y_root)
def bind_shortcuts(self):
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Shift+Insert для вставки
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
# Ctrl+Insert для копирования
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
# Shift+Delete для вырезания
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
def cut(self):
self.event_generate("<<Cut>>")
def copy(self):
self.event_generate("<<Copy>>")
def paste(self):
self.event_generate("<<Paste>>")
def select_all(self, event=None):
self.select_range(0, tk.END)
return "break"
class SettingsWindow(tk.Toplevel):
def __init__(self, parent, settings, callback=None):
super().__init__(parent)
@@ -486,13 +592,13 @@ class SettingsWindow(tk.Toplevel):
# Размер блока
ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5)
self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
block_size_entry = ttk.Entry(settings_frame, textvariable=self.block_size_var)
block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var)
block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
# Приглашение командной строки
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
self.prompt_var = StringVar(value=settings.get("prompt", ">"))
prompt_entry = ttk.Entry(settings_frame, textvariable=self.prompt_var)
prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var)
prompt_entry.grid(row=5, column=1, sticky=W, pady=5)
# Кнопки
@@ -541,30 +647,94 @@ class SettingsWindow(tk.Toplevel):
class SerialAppGUI(tk.Tk):
def __init__(self, settings):
super().__init__()
self.title("Serial Device Manager")
self.title("ComConfigCopy")
self.geometry("900x700")
# Добавляем VERSION как атрибут класса
self.VERSION = VERSION
# Инициализация проверки обновлений
self.update_checker = UpdateChecker(
VERSION,
"https://gitea.filow.ru/LowaSC/ComConfigCopy"
)
# Настройка стиля
self.style = ttk.Style(self)
self.style.theme_use("clam")
default_font = ("Segoe UI", 10)
self.option_add("*Font", default_font)
self.settings = settings
self.connection = None
self.tftp_server = None
# Глобальные биндинги
self.bind_class("Text", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Text", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Text", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.bind_class("Entry", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Entry", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Entry", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.create_menu()
self.create_tabs()
# Проверка первого запуска
self.check_first_run()
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="Проверить обновления", command=self.check_for_updates)
help_menu.add_separator()
help_menu.add_command(label="О программе", command=self.open_about)
def check_for_updates(self):
"""Проверка наличия обновлений"""
def on_update_check(update_available, error):
if error:
messagebox.showerror(
"Ошибка проверки обновлений",
f"Не удалось проверить наличие обновлений:\n{error}"
)
elif update_available:
release_info = self.update_checker.get_release_notes()
if release_info:
response = messagebox.askyesno(
"Доступно обновление",
f"Доступна новая версия {release_info['version']}!\n\n"
f"Изменения:\n{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?",
)
if response:
webbrowser.open(release_info["download_url"])
else:
messagebox.showerror(
"Ошибка",
"Не удалось получить информацию о новой версии"
)
else:
messagebox.showinfo(
"Проверка обновлений",
"У вас установлена последняя версия программы"
)
self.update_checker.check_updates(callback=on_update_check)
def on_settings_changed(self):
"""Обработчик изменения настроек"""
self.settings = settings_load()
def open_settings(self):
"""Открытие окна настроек"""
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
def check_first_run(self):
"""Проверка первого запуска программы"""
show_settings = False
@@ -595,31 +765,6 @@ class SerialAppGUI(tk.Tk):
if response:
self.open_settings()
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="О программе", command=self.open_about)
def open_settings(self):
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
def on_settings_changed(self):
# Обновляем настройки в основном приложении
self.settings = settings_load()
def create_tabs(self):
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
@@ -647,13 +792,13 @@ class SerialAppGUI(tk.Tk):
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
self.interactive_text = tk.Text(frame, wrap="word", height=20)
self.interactive_text = CustomText(frame, wrap="word", height=20)
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
input_frame = ttk.Frame(frame)
input_frame.pack(fill=X, pady=5)
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
self.command_entry = ttk.Entry(input_frame, width=50)
self.command_entry = CustomEntry(input_frame, width=50)
self.command_entry.pack(side=LEFT, padx=5)
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
@@ -744,10 +889,10 @@ class SerialAppGUI(tk.Tk):
file_frame.pack(fill=X, pady=5)
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
self.file_exec_text = tk.Text(frame, wrap="word", height=15)
self.file_exec_text = CustomText(frame, wrap="word", height=15)
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def select_config_file_fileexec(self):
@@ -793,11 +938,11 @@ class SerialAppGUI(tk.Tk):
top_frame.pack(fill=X, pady=5)
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
self.config_editor_text = tk.Text(frame, wrap="word")
self.config_editor_text = CustomText(frame, wrap="word")
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def select_config_file_editor(self):
@@ -868,7 +1013,7 @@ class SerialAppGUI(tk.Tk):
port_frame.pack(fill=X, padx=5, pady=2)
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
self.tftp_port_var = StringVar(value="69")
self.tftp_port_entry = ttk.Entry(port_frame, textvariable=self.tftp_port_var)
self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var)
self.tftp_port_entry.pack(fill=X, expand=True, padx=5)
# Кнопки управления
@@ -894,7 +1039,7 @@ class SerialAppGUI(tk.Tk):
log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера")
log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
self.tftp_log_text = tk.Text(log_frame, wrap=tk.WORD, height=10)
self.tftp_log_text = CustomText(log_frame, wrap=tk.WORD, height=10)
self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Добавляем скроллбар для лога
@@ -1068,15 +1213,27 @@ class SerialAppGUI(tk.Tk):
# Вычисляем прогресс
progress = f"{bytes_sent}/{filesize} байт"
remaining = filesize - bytes_sent
remaining_bytes = filesize - bytes_sent
elapsed_time = time.time() - start_time
# Вычисляем скорость передачи (байт/сек)
if elapsed_time > 0:
transfer_speed = bytes_sent / elapsed_time
# Вычисляем оставшееся время
if transfer_speed > 0:
remaining_time = remaining_bytes / transfer_speed
remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)"
else:
remaining_str = f"{remaining_bytes} байт (неизвестно)"
else:
remaining_str = f"{remaining_bytes} байт (вычисляется...)"
# Добавляем запись в таблицу
self.transfers_tree.insert("", END, values=(
f"{client_addr[0]}:{client_addr[1]}",
filename,
progress,
f"{remaining} байт",
remaining_str,
f"{elapsed_time:.1f}с"
))
@@ -1094,82 +1251,6 @@ class SerialAppGUI(tk.Tk):
if not self.tftp_ip_var.get() in adapters:
self.tftp_ip_var.set(adapters[0])
# ==========================
# Парсер аргументов (не используется)
# ==========================
# def parse_arguments():
# parser = argparse.ArgumentParser(
# description="Программа для работы с устройствами через последовательный порт с графическим интерфейсом."
# )
# parser.add_argument("--cli", action="store_true", help="Запустить в режиме командной строки (без графики)")
# return parser.parse_args()
# ==========================
# Режим командной строки (не используется)
# ==========================
# def run_cli_mode(settings):
# print("Запущен режим командной строки.")
# while True:
# print("\n--- Главное меню ---")
# print("1. Подключиться и войти в интерактивный режим")
# print("2. Выполнить команды из файла конфигурации")
# print("0. Выход")
# choice = input("Выберите действие: ").strip()
# if choice == "1":
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if not conn:
# print("[ERROR] Не удалось установить соединение.")
# continue
# print("[INFO] Введите команды (для выхода введите _exit):")
# while True:
# cmd = input("Команда: ")
# if cmd.strip().lower() == "_exit":
# break
# try:
# conn.write((cmd + "\n").encode())
# response = read_response(
# conn, settings.get("timeout", 10),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False
# )
# print(response)
# except Exception as e:
# print(f"[ERROR] {e}")
# break
# conn.close()
# elif choice == "2":
# if not settings.get("config_file"):
# print("[ERROR] Файл конфигурации не выбран!")
# continue
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if conn:
# execute_commands_from_file(
# conn,
# settings["config_file"],
# settings.get("timeout", 10),
# settings.get("copy_mode", "line"),
# settings.get("block_size", 15),
# lambda msg: print(msg),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False,
# )
# conn.close()
# else:
# print("[ERROR] Не удалось установить соединение.")
# elif choice == "0":
# break
# else:
# print("[ERROR] Некорректный выбор.")
# ==========================
# Основной запуск приложения
# ==========================

44
README.md Normal file
View File

@@ -0,0 +1,44 @@
# ComConfigCopy
Программа для копирования конфигураций на коммутаторы.
## Описание
ComConfigCopy - это утилита, разработанная для автоматизации процесса копирования конфигураций на сетевые коммутаторы. Программа предоставляет удобный графический интерфейс для управления процессом копирования и настройки параметров подключения.
## Основные возможности
- Копирование конфигураций на коммутаторы через COM-порт
- Поддержка различных скоростей подключения
- Автоматическое определение доступных COM-портов
- Возможность сохранения и загрузки настроек
- Автоматическое обновление через GitHub
## Системные требования
- Windows 7/8/10/11
- Python 3.8 или выше
- Доступ к COM-портам
## Установка
1. Скачайте последнюю версию программы из [репозитория](https://gitea.filow.ru/LowaSC/ComConfigCopy/releases)
2. Распакуйте архив в удобное место
3. Запустите файл `ComConfigCopy.exe`
## Использование
1. Выберите COM-порт из списка доступных
2. Настройте параметры подключения (скорость, биты данных и т.д.)
3. Выберите файл конфигурации для отправки
4. Нажмите кнопку "Отправить" для начала процесса копирования
## Контакты
- Email: LowaWorkMail@gmail.com
- Telegram: [@LowaSC](https://t.me/LowaSC)
- Репозиторий: [ComConfigCopy](https://gitea.filow.ru/LowaSC/ComConfigCopy)
## Лицензия
Этот проект распространяется под лицензией MIT. Подробности смотрите в файле [LICENSE](LICENSE).

View File

@@ -224,6 +224,8 @@ class TFTPServer:
Передача файла клиенту по протоколу TFTP.
"""
BLOCK_SIZE = 512
MAX_RETRIES = 5
TIMEOUT = 2.0
transfer_socket = None
try:
if not os.path.exists(file_path):
@@ -249,124 +251,99 @@ class TFTPServer:
'start_time': start_time
}
self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.")
# Создаем отдельный сокет для передачи файла
# Создаем новый сокет для передачи данных
transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
transfer_socket.settimeout(5.0) # Таймаут ожидания ACK
transfer_socket.settimeout(TIMEOUT)
# Добавляем сокет в множество активных сокетов
with self.lock:
self.transfer_sockets.add(transfer_socket)
block_num = 1
bytes_sent = 0
last_progress_time = time.time()
self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.")
try:
with open(file_path, "rb") as f:
while self.running: # Проверяем флаг running
with open(file_path, 'rb') as file:
block_number = 1
last_successful_block = 0
while True:
# Читаем блок данных
data = file.read(BLOCK_SIZE)
# Формируем и отправляем пакет данных
packet = struct.pack('!HH', 3, block_number) + data
retries = 0
while retries < MAX_RETRIES:
try:
data_block = f.read(BLOCK_SIZE)
if not data_block: # Достигнут конец файла
break
# Проверяем флаг running перед отправкой блока
if not self.running:
raise Exception("Передача прервана: сервер остановлен")
# Формируем TFTP пакет данных
packet = struct.pack("!HH", 3, block_num) + data_block
attempts = 0
ack_received = False
# Попытка отправки текущего блока (до 3 повторных попыток)
while attempts < 3 and not ack_received and self.running:
if transfer_socket is None:
raise Exception("Сокет передачи закрыт")
transfer_socket.sendto(packet, client_addr)
# Ожидаем подтверждение
while True:
try:
transfer_socket.sendto(packet, client_addr)
# Логируем прогресс каждую секунду
current_time = time.time()
if current_time - last_progress_time >= 1.0:
elapsed_time = current_time - start_time
remaining = filesize - bytes_sent
self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | "
f"Отправлено: {bytes_sent}/{filesize} байт | "
f"Осталось: {remaining} байт | "
f"Время: {elapsed_time:.2f} сек.")
last_progress_time = current_time
# Ожидаем подтверждение
ack_data, addr = transfer_socket.recvfrom(4)
if addr == client_addr:
ack_opcode, ack_block = struct.unpack("!HH", ack_data)
if ack_opcode == 4 and ack_block == block_num:
ack_received = True
bytes_sent += len(data_block)
with self.lock:
if client_addr in self.active_transfers:
self.active_transfers[client_addr]['bytes_sent'] = bytes_sent
else:
self.log(f"[WARN] Неверный ACK от {client_addr}. "
f"Ожидался блок {block_num}, получен {ack_block}.")
ack_data, ack_addr = transfer_socket.recvfrom(4)
if ack_addr == client_addr and len(ack_data) >= 4:
opcode, ack_block = struct.unpack('!HH', ack_data)
if opcode == 4: # ACK
if ack_block == block_number:
# Успешное подтверждение
last_successful_block = block_number
bytes_sent = min((block_number * BLOCK_SIZE), filesize)
# Обновляем информацию о прогрессе
with self.lock:
if client_addr in self.active_transfers:
self.active_transfers[client_addr]['bytes_sent'] = bytes_sent
# Логируем статус каждую секунду
current_time = time.time()
if current_time - start_time >= 1.0:
bytes_remaining = filesize - bytes_sent
elapsed_time = current_time - start_time
self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | "
f"Отправлено: {bytes_sent}/{filesize} байт | "
f"Осталось: {bytes_remaining} байт | "
f"Время: {elapsed_time:.2f} сек.")
break
elif ack_block < block_number:
# Получен старый ACK, игнорируем
continue
except socket.timeout:
attempts += 1
self.log(f"[WARN] Таймаут ожидания ACK для блока {block_num} "
f"от {client_addr}. Попытка {attempts+1}.")
except socket.error as e:
if not self.running:
raise Exception("Передача прервана: сервер остановлен")
self.log(f"[ERROR] Ошибка сокета при отправке блока {block_num}: {str(e)}")
attempts += 1
except Exception as e:
if not self.running:
raise Exception("Передача прервана: сервер остановлен")
self.log(f"[ERROR] Ошибка при отправке блока {block_num}: {str(e)}")
attempts += 1
if not ack_received:
raise Exception(f"Не удалось получить подтверждение для блока {block_num}")
block_num = (block_num + 1) % 65536
break
if last_successful_block == block_number:
break
else:
retries += 1
self.log(f"[WARN] Таймаут ожидания ACK для блока {block_number} от {client_addr}. "
f"Попытка {retries + 1}.")
except Exception as e:
if not self.running:
raise Exception("Передача прервана: сервер остановлен")
raise
retries += 1
self.log(f"[ERROR] Ошибка при передаче блока {block_number}: {str(e)}")
if retries >= MAX_RETRIES:
self.log(f"[ERROR] Превышено максимальное количество попыток для блока {block_number}")
return
block_number += 1
# Если отправили меньше BLOCK_SIZE байт, это был последний блок
if len(data) < BLOCK_SIZE:
break
if bytes_sent == filesize:
elapsed_time = time.time() - start_time
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} "
f"завершена за {elapsed_time:.2f} сек. Всего отправлено {bytes_sent} байт.")
except Exception as e:
if not self.running:
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} прервана: сервер остановлен")
raise
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} завершена успешно")
except Exception as e:
if not self.running:
return # Не логируем повторно о прерывании передачи
self.log(f"[ERROR] Ошибка при передаче файла '{os.path.basename(file_path)}' "
f"клиенту {client_addr}: {str(e)}")
try:
self.send_error(client_addr, 0, str(e))
except:
pass
self.log(f"[ERROR] Ошибка при передаче файла: {str(e)}")
finally:
# Закрываем сокет передачи
if transfer_socket:
try:
with self.lock:
self.transfer_sockets.discard(transfer_socket)
transfer_socket.close()
transfer_socket = None
except:
pass
# Удаляем информацию о передаче
# Очищаем информацию о передаче
with self.lock:
if client_addr in self.active_transfers:
del self.active_transfers[client_addr]
del self.active_transfers[client_addr]
if transfer_socket in self.transfer_sockets:
self.transfer_sockets.remove(transfer_socket)
if transfer_socket:
try:
transfer_socket.close()
except:
pass

View File

@@ -2,19 +2,22 @@
# -*- coding: utf-8 -*-
import tkinter as tk
from tkinter import ttk, BOTH, X, BOTTOM
from tkinter import ttk, BOTH, X, BOTTOM, END
import webbrowser
class AboutWindow(tk.Toplevel):
def __init__(self, parent):
super().__init__(parent)
self.title("О программе")
self.geometry("400x300")
self.geometry("600x500")
self.resizable(False, False)
# Создаем фрейм
# Сохраняем ссылку на родительское окно
self.parent = parent
# Создаем фрейм для содержимого
about_frame = ttk.Frame(self, padding="20")
about_frame.pack(fill=BOTH, expand=True)
about_frame.pack(fill=BOTH, expand=True, padx=10, pady=10)
# Заголовок
ttk.Label(
@@ -33,7 +36,7 @@ class AboutWindow(tk.Toplevel):
# Версия
ttk.Label(
about_frame,
text="Версия 1.0",
text=f"Версия {getattr(parent, 'VERSION', '1.0.0')}",
font=("Segoe UI", 10)
).pack(pady=(0, 20))
@@ -80,14 +83,14 @@ class AboutWindow(tk.Toplevel):
# Кнопка закрытия
ttk.Button(
about_frame,
self,
text="Закрыть",
command=self.destroy
).pack(side=BOTTOM, pady=(20, 0))
).pack(side=BOTTOM, pady=10)
# Центрируем окно
self.center_window()
def center_window(self):
self.update_idletasks()
width = self.winfo_width()
@@ -95,6 +98,6 @@ class AboutWindow(tk.Toplevel):
x = (self.winfo_screenwidth() // 2) - (width // 2)
y = (self.winfo_screenheight() // 2) - (height // 2)
self.geometry(f"{width}x{height}+{x}+{y}")
def open_url(self, url):
webbrowser.open(url)

View File

@@ -1 +1,4 @@
tftpy>=0.8.0
tftpy>=0.8.0
pyserial>=3.5
requests>=2.31.0
packaging>=23.2

175
update_checker.py Normal file
View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import requests
import threading
from packaging import version
class UpdateCheckError(Exception):
"""Исключение для ошибок проверки обновлений"""
pass
class UpdateChecker:
"""Класс для проверки обновлений программы"""
def __init__(self, current_version, repo_url):
self.current_version = current_version
self.repo_url = repo_url
# Формируем базовый URL API
self.api_url = repo_url.replace("gitea.filow.ru", "gitea.filow.ru/api/v1/repos/LowaSC/ComConfigCopy")
self._update_available = False
self._latest_version = None
self._latest_release = None
self._error = None
self._changelog = None
def get_changelog(self, callback=None):
"""
Получение changelog из репозитория.
:param callback: Функция обратного вызова, которая будет вызвана после получения changelog
"""
def fetch():
try:
# Пытаемся получить CHANGELOG.md из репозитория
response = requests.get(f"{self.api_url}/contents/CHANGELOG.md", timeout=10)
response.raise_for_status()
content = response.json()
if "content" in content:
import base64
changelog_content = base64.b64decode(content["content"]).decode("utf-8")
self._changelog = changelog_content
self._error = None
else:
raise UpdateCheckError("Не удалось получить содержимое CHANGELOG.md")
except requests.RequestException as e:
error_msg = f"Ошибка получения changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
except Exception as e:
error_msg = f"Неизвестная ошибка при получении changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
finally:
if callback:
callback(self._changelog, self._error)
# Запускаем получение в отдельном потоке
threading.Thread(target=fetch, daemon=True).start()
def check_updates(self, callback=None):
"""
Проверка наличия обновлений.
:param callback: Функция обратного вызова, которая будет вызвана после проверки
"""
def check():
try:
response = requests.get(f"{self.api_url}/releases", timeout=10)
response.raise_for_status()
releases = response.json()
if not releases:
raise UpdateCheckError("Не найдено релизов в репозитории")
latest_release = releases[0]
latest_version = latest_release.get("tag_name", "").lstrip("v")
if not latest_version:
raise UpdateCheckError("Не удалось определить версию последнего релиза")
try:
if version.parse(latest_version) > version.parse(self.current_version):
self._update_available = True
self._latest_version = latest_version
self._latest_release = latest_release
logging.info(f"Доступно обновление: {latest_version}")
else:
logging.info("Обновления не требуются")
except version.InvalidVersion as e:
raise UpdateCheckError(f"Некорректный формат версии: {e}")
self._error = None
except requests.RequestException as e:
error_msg = f"Ошибка сетевого подключения: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
except UpdateCheckError as e:
logging.error(str(e), exc_info=True)
self._error = str(e)
except Exception as e:
error_msg = f"Неизвестная ошибка при проверке обновлений: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
finally:
if callback:
callback(self._update_available, self._error)
@property
def update_available(self):
"""Доступно ли обновление"""
return self._update_available
@property
def latest_version(self):
"""Последняя доступная версия"""
return self._latest_version
@property
def error(self):
"""Последняя ошибка при проверке обновлений"""
return self._error
@property
def changelog(self):
"""Текущий changelog"""
return self._changelog
def get_release_notes(self):
"""Получение информации о последнем релизе"""
if self._latest_release:
return {
"version": self._latest_version,
"description": self._latest_release.get("body", ""),
"download_url": self._latest_release.get("assets", [{}])[0].get("browser_download_url", "")
}
return None
def get_releases(self, callback=None):
"""
Получение списка релизов из репозитория.
:param callback: Функция обратного вызова, которая будет вызвана после получения списка релизов
"""
def fetch():
try:
response = requests.get(f"{self.api_url}/releases", timeout=10)
response.raise_for_status()
releases = response.json()
if not releases:
raise UpdateCheckError("Не найдено релизов в репозитории")
self._error = None
if callback:
callback(releases, None)
except requests.RequestException as e:
error_msg = f"Ошибка сетевого подключения: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
except Exception as e:
error_msg = f"Ошибка при получении списка релизов: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
# Запускаем получение в отдельном потоке
threading.Thread(target=fetch, daemon=True).start()