#!/usr/bin/env python3 # -*- coding: utf-8 -*- # ------------------------------------------------------------ # Это программа для копирования конфигураций на коммутаторы # ------------------------------------------------------------ import json import logging import os import re import sys import threading import time import webbrowser from getpass import getpass from logging.handlers import RotatingFileHandler import tkinter as tk from tkinter import ( StringVar, END, BOTH, LEFT, RIGHT, X, W, filedialog, messagebox, simpledialog, ) from tkinter import ttk import serial import serial.tools.list_ports from serial.serialutil import SerialException from about_window import AboutWindow from TFTPServer import TFTPServer import socket from update_checker import UpdateChecker # Версия программы VERSION = "1.0.2" # Создаем необходимые папки os.makedirs("Logs", exist_ok=True) os.makedirs("Configs", exist_ok=True) os.makedirs("Settings", exist_ok=True) os.makedirs("Firmware", exist_ok=True) os.makedirs("docs", exist_ok=True) # Файл настроек находится в папке Settings SETTINGS_FILE = os.path.join("Settings", "settings.json") # Настройка логирования с использованием RotatingFileHandler. def setup_logging(): logger = logging.getLogger() logger.setLevel(logging.DEBUG) log_path = os.path.join("Logs", "app.log") handler = RotatingFileHandler( log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8" ) formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) # Загрузка настроек из JSON-файла или создание настроек по умолчанию. def settings_load(): default_settings = { "port": None, # Порт для подключения "baudrate": 9600, # Скорость передачи данных "config_file": None, # Файл конфигурации "login": None, # Логин для подключения "password": None, # Пароль для подключения "timeout": 10, # Таймаут подключения "copy_mode": "line", # Режим копирования "block_size": 15, # Размер блока команд "prompt": ">", # Используется для определения приглашения } # Создаем папку Settings, если её нет os.makedirs("Settings", exist_ok=True) if not os.path.exists(SETTINGS_FILE): try: # При первом запуске создаем новый файл настроек with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(default_settings, f, indent=4, ensure_ascii=False) logging.info("Файл настроек создан с настройками по умолчанию.") return default_settings except Exception as e: logging.error(f"Ошибка при создании файла настроек: {e}", exc_info=True) return default_settings try: # Пытаемся загрузить существующие настройки with open(SETTINGS_FILE, "r", encoding="utf-8") as f: settings = json.load(f) # Проверяем наличие всех необходимых параметров settings_changed = False for key, value in default_settings.items(): if key not in settings: settings[key] = value settings_changed = True # Если были добавлены новые параметры, сохраняем обновленные настройки if settings_changed: try: with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(settings, f, indent=4, ensure_ascii=False) logging.info("Файл настроек обновлен с новыми параметрами.") except Exception as e: logging.error(f"Ошибка при обновлении файла настроек: {e}", exc_info=True) return settings except Exception as e: logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True) return default_settings # Сохранение настроек в JSON-файл def settings_save(settings): try: with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(settings, f, indent=4, ensure_ascii=False) logging.info("Настройки сохранены в файл.") except Exception as e: logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True) # Получение списка доступных последовательных портов def list_serial_ports(): """Получение списка доступных последовательных портов.""" ports = serial.tools.list_ports.comports() logging.debug(f"Найдено {len(ports)} серийных портов.") return [port.device for port in ports] # Получение списка IP-адресов из сетевых адаптеров def get_network_adapters(): adapters = [] try: # Получаем имя хоста hostname = socket.gethostname() # Получаем все адреса для данного хоста addresses = socket.getaddrinfo(hostname, None) # Создаем множество для хранения уникальных IP-адресов unique_ips = set() for addr in addresses: ip = addr[4][0] # Пропускаем IPv6 и локальные адреса if ':' not in ip and not ip.startswith('127.'): unique_ips.add(ip) # Добавляем все найденные IP-адреса в список for ip in sorted(unique_ips): adapters.append(f"{ip}") # Добавляем 0.0.0.0 для прослушивания всех интерфейсов adapters.insert(0, "0.0.0.0") except Exception as e: logging.error(f"Ошибка при получении списка сетевых адаптеров: {e}", exc_info=True) # В случае ошибки возвращаем хотя бы 0.0.0.0 adapters = ["0.0.0.0"] return adapters # Создание соединения с устройством через последовательный порт def create_connection(settings): try: conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1) logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.") time.sleep(1) return conn except SerialException as e: logging.error(f"Ошибка подключения: {e}", exc_info=True) except Exception as e: logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True) return None # Проверка наличия логина и пароля в настройках и отправка их на устройство def send_login_password(serial_connection, login=None, password=None, is_gui=False): if not login: if is_gui: login = simpledialog.askstring("Login", "Введите логин:") if login is None: login = "" else: login = input("Введите логин: ") if not password: if is_gui: password = simpledialog.askstring("Password", "Введите пароль:", show="*") if password is None: password = "" else: password = getpass("Введите пароль: ") # Чтение ответа от устройства с учётом таймаута. def read_response(serial_connection, timeout, login=None, password=None, is_gui=False): response = b"" end_time = time.time() + timeout decoded = "" while time.time() < end_time: if serial_connection.in_waiting: chunk = serial_connection.read(serial_connection.in_waiting) response += chunk if b"--More--" in response: serial_connection.write(b"\n") response = response.replace(b"--More--", b"") try: decoded = response.decode(errors="ignore") except Exception: decoded = "" lines = decoded.rstrip().splitlines() if lines: last_line = lines[-1].strip() if re.search(r'(login:|username:)$', last_line, re.IGNORECASE): send_login_password(serial_connection, login, None, is_gui) response = b"" continue if re.search(r'(password:)$', last_line, re.IGNORECASE): send_login_password(serial_connection, None, password, is_gui) response = b"" continue if last_line.endswith(">") or last_line.endswith("#"): break else: time.sleep(0.1) return decoded # Генерация блоков команд для блочного копирования def generate_command_blocks(lines, block_size): blocks = [] current_block = [] for line in lines: trimmed = line.strip() if not trimmed: continue lower_line = trimmed.lower() if lower_line.startswith("vlan") or lower_line.startswith("enable") or lower_line.startswith("interface"): if current_block: blocks.append("\n".join(current_block)) current_block = [] blocks.append(trimmed) elif lower_line.startswith("exit"): current_block.append(trimmed) blocks.append("\n".join(current_block)) current_block = [] else: current_block.append(trimmed) if len(current_block) >= block_size: blocks.append("\n".join(current_block)) current_block = [] if current_block: blocks.append("\n".join(current_block)) return blocks # Выполнение команд из файла конфигурации def execute_commands_from_file( serial_connection, filename, timeout, copy_mode, block_size, log_callback=None, login=None, password=None, is_gui=False, ): try: with open(filename, "r", encoding="utf-8") as file: lines = [line for line in file if line.strip()] msg = f"Выполнение команд из файла: {filename}\n" logging.info(msg) if log_callback: log_callback(msg) # Если выбран построчный режим if copy_mode == "line": for cmd in lines: cmd = cmd.strip() max_attempts = 3 attempt = 0 while attempt < max_attempts: msg = f"[CMD] {cmd}" # Изменено форматирование для команды if log_callback: log_callback(msg) serial_connection.write((cmd + "\n").encode()) logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) if response: if '^' in response: msg = ( f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" f"Ответ устройства:\n{response}\n" f"Повторная отправка команды...\n" ) if log_callback: log_callback(msg) logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.") attempt += 1 time.sleep(1) continue else: msg = f"Ответ устройства:\n{response}\n" if log_callback: log_callback(msg) logging.info(f"Ответ устройства:\n{response}") break else: msg = f"Ответ не получен для команды: {cmd}\n" if log_callback: log_callback(msg) logging.warning(f"Нет ответа для команды: {cmd}") break if attempt == max_attempts: msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n" if log_callback: log_callback(msg) logging.error(msg) time.sleep(1) # Если выбран блочный режим elif copy_mode == "block": blocks = generate_command_blocks(lines, block_size) for block in blocks: msg = f"[CMD] Отправка блока команд:\n{block}" # Изменено форматирование для блока команд if log_callback: log_callback(msg) serial_connection.write((block + "\n").encode()) logging.info(f"Отправлен блок команд:\n{block}") response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) # Если обнаружена ошибка в ответе на блок, отправляем команды по очереди if response and '^' in response: msg = ( f"[WARNING] Обнаружена ошибка при выполнении блока команд.\n" f"Ответ устройства:\n{response}\n" f"Пересылаются команды по отдельности...\n" ) if log_callback: log_callback(msg) logging.warning("Ошибка в блочном режиме – отправляем команды индивидуально.") for line in block.splitlines(): cmd = line.strip() if not cmd: continue max_attempts = 3 attempt = 0 while attempt < max_attempts: sub_msg = f"[CMD] {cmd}" # Изменено форматирование для команды if log_callback: log_callback(sub_msg) serial_connection.write((cmd + "\n").encode()) logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") sub_response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) if sub_response: if '^' in sub_response: sub_msg = ( f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" f"Ответ устройства:\n{sub_response}\n" f"Повторная отправка команды...\n" ) if log_callback: log_callback(sub_msg) logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.") attempt += 1 time.sleep(1) continue else: sub_msg = f"Ответ устройства:\n{sub_response}\n" if log_callback: log_callback(sub_msg) logging.info(f"Ответ устройства:\n{sub_response}") break else: sub_msg = f"Ответ не получен для команды: {cmd}\n" if log_callback: log_callback(sub_msg) logging.warning(f"Нет ответа для команды: {cmd}") break if attempt == max_attempts: sub_msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n" if log_callback: log_callback(sub_msg) logging.error(sub_msg) time.sleep(1) else: if response: msg = f"Ответ устройства:\n{response}\n" if log_callback: log_callback(msg) logging.info(f"Ответ устройства:\n{response}") else: msg = f"Ответ не получен для блока:\n{block}\n" if log_callback: log_callback(msg) logging.warning(f"Нет ответа для блока:\n{block}") time.sleep(1) except SerialException as e: msg = f"Ошибка при выполнении команды: {e}\n" if log_callback: log_callback(msg) logging.error(f"Ошибка при выполнении команды: {e}", exc_info=True) except Exception as e: msg = f"Ошибка при выполнении команд: {e}\n" if log_callback: log_callback(msg) logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True) # Базовый класс для кастомных виджетов с поддержкой контекстного меню и горячих клавиш class CustomWidgetBase: def __init__(self): self.create_context_menu() self.bind_shortcuts() # Создание контекстного меню def create_context_menu(self): """Создание контекстного меню""" self.context_menu = tk.Menu(self, tearoff=0) self.context_menu.add_command(label="Вырезать", command=self.cut) self.context_menu.add_command(label="Копировать", command=self.copy) self.context_menu.add_command(label="Вставить", command=self.paste) self.context_menu.add_separator() self.context_menu.add_command(label="Выделить всё", command=self.select_all) self.bind("", self.show_context_menu) # Привязка горячих клавиш def bind_shortcuts(self): """Привязка горячих клавиш""" # Стандартные сочетания self.bind("", lambda e: self.event_generate("<>")) self.bind("", lambda e: self.event_generate("<>")) self.bind("", lambda e: self.event_generate("<>")) self.bind("", self.select_all) # Дополнительные сочетания self.bind("", lambda e: self.event_generate("<>")) self.bind("", lambda e: self.event_generate("<>")) self.bind("", lambda e: self.event_generate("<>")) # Отображение контекстного меню def show_context_menu(self, event): """Отображение контекстного меню""" self.context_menu.post(event.x_root, event.y_root) # Вырезание текста def cut(self): self.event_generate("<>") # Копирование текста def copy(self): self.event_generate("<>") # Вставка текста def paste(self): self.event_generate("<>") # Класс для текстового поля с поддержкой контекстного меню и горячих клавиш class CustomText(tk.Text, CustomWidgetBase): def __init__(self, *args, **kwargs): tk.Text.__init__(self, *args, **kwargs) CustomWidgetBase.__init__(self) def select_all(self, event=None): """Выделение всего текста в Text виджете""" self.tag_add(tk.SEL, "1.0", tk.END) self.mark_set(tk.INSERT, "1.0") self.see(tk.INSERT) return "break" # Класс для поля ввода с поддержкой контекстного меню и горячих клавиш class CustomEntry(ttk.Entry, CustomWidgetBase): def __init__(self, *args, **kwargs): ttk.Entry.__init__(self, *args, **kwargs) CustomWidgetBase.__init__(self) def select_all(self, event=None): """Выделение всего текста в Entry виджете""" self.select_range(0, tk.END) return "break" # Класс для окна настроек class SettingsWindow(tk.Toplevel): def __init__(self, parent, settings, callback=None): super().__init__(parent) self.title("Настройки") self.geometry("600x400") self.settings = settings self.callback = callback self.resizable(False, False) # Создаем фрейм для настроек settings_frame = ttk.Frame(self, padding="10") settings_frame.pack(fill=BOTH, expand=True) # COM порт ttk.Label(settings_frame, text="COM порт:").grid(row=0, column=0, sticky=W, pady=5) self.port_var = StringVar(value=settings.get("port", "")) self.port_combo = ttk.Combobox(settings_frame, textvariable=self.port_var) self.port_combo.grid(row=0, column=1, sticky=W, pady=5) ttk.Button(settings_frame, text="Обновить", command=self.update_ports).grid(row=0, column=2, padx=5) # Скорость передачи ttk.Label(settings_frame, text="Скорость:").grid(row=1, column=0, sticky=W, pady=5) self.baudrate_var = StringVar(value=str(settings.get("baudrate", 9600))) baudrate_combo = ttk.Combobox(settings_frame, textvariable=self.baudrate_var, values=["9600", "19200", "38400", "57600", "115200"]) baudrate_combo.grid(row=1, column=1, sticky=W, pady=5) # Таймаут ttk.Label(settings_frame, text="Таймаут (сек):").grid(row=2, column=0, sticky=W, pady=5) self.timeout_var = StringVar(value=str(settings.get("timeout", 10))) timeout_entry = ttk.Entry(settings_frame, textvariable=self.timeout_var) timeout_entry.grid(row=2, column=1, sticky=W, pady=5) # Режим копирования ttk.Label(settings_frame, text="Режим копирования:").grid(row=3, column=0, sticky=W, pady=5) self.copy_mode_var = StringVar(value=settings.get("copy_mode", "line")) copy_mode_frame = ttk.Frame(settings_frame) copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5) ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line", variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT) ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block", variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT) # Размер блока self.block_size_label = ttk.Label(settings_frame, text="Размер блока:") self.block_size_label.grid(row=4, column=0, sticky=W, pady=5) self.block_size_var = StringVar(value=str(settings.get("block_size", 15))) self.block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var) self.block_size_entry.grid(row=4, column=1, sticky=W, pady=5) # Приглашение командной строки ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5) self.prompt_var = StringVar(value=settings.get("prompt", ">")) prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var) prompt_entry.grid(row=5, column=1, sticky=W, pady=5) # Кнопки button_frame = ttk.Frame(settings_frame) button_frame.grid(row=6, column=0, columnspan=3, pady=20) ttk.Button(button_frame, text="Сохранить", command=self.save_settings).pack(side=LEFT, padx=5) ttk.Button(button_frame, text="Отмена", command=self.destroy).pack(side=LEFT, padx=5) self.update_ports() # Инициализация видимости поля размера блока self.toggle_block_size() # Центрируем окно self.center_window() # Переключение видимости поля размера блока def toggle_block_size(self): if self.copy_mode_var.get() == "line": self.block_size_label.grid_remove() self.block_size_entry.grid_remove() else: self.block_size_label.grid() self.block_size_entry.grid() # Центрирование окна def center_window(self): self.update_idletasks() width = self.winfo_width() height = self.winfo_height() x = (self.winfo_screenwidth() // 2) - (width // 2) y = (self.winfo_screenheight() // 2) - (height // 2) self.geometry(f"{width}x{height}+{x}+{y}") # Обновление списка доступных последовательных портов def update_ports(self): ports = list_serial_ports() self.port_combo["values"] = ports if ports and not self.port_var.get(): self.port_var.set(ports[0]) # Сохранение настроек def save_settings(self): try: self.settings.update({ "port": self.port_var.get(), "baudrate": int(self.baudrate_var.get()), "timeout": int(self.timeout_var.get()), "copy_mode": self.copy_mode_var.get(), "block_size": int(self.block_size_var.get()), "prompt": self.prompt_var.get() }) settings_save(self.settings) if self.callback: self.callback() self.destroy() messagebox.showinfo("Успех", "Настройки успешно сохранены") except ValueError as e: messagebox.showerror("Ошибка", "Проверьте правильность введенных значений") # Общая функция для добавления текста в текстовое поле def append_text_to_widget(widget, text): # Проверяем, заканчивается ли текст символом новой строки if not text.endswith('\n'): text += '\n' widget.insert(END, text) widget.see(END) # Общая функция для выбора файла конфигурации def select_config_file(self, var, save_to_settings=False): filename = filedialog.askopenfilename( title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")] ) if filename: var.set(filename) if save_to_settings: self.settings["config_file"] = filename settings_save(self.settings) # Общая функция для отправки команды и обработки ответа def send_command_and_process_response( serial_connection, cmd, timeout, max_attempts=3, log_callback=None, login=None, password=None, is_gui=False ): attempt = 0 while attempt < max_attempts: msg = f"\nОтправка команды: {cmd}\n" if log_callback: log_callback(msg) serial_connection.write((cmd + "\n").encode()) logging.info(f"Отправлена команда: {cmd}") response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) if response: if '^' in response: msg = ( f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" f"Ответ устройства:\n{response}\n" f"Повторная отправка команды...\n" ) if log_callback: log_callback(msg) logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.") attempt += 1 time.sleep(1) continue else: msg = f"Ответ устройства:\n{response}\n" if log_callback: log_callback(msg) logging.info(f"Ответ устройства:\n{response}") return True, response else: msg = f"Ответ не получен для команды: {cmd}\n" if log_callback: log_callback(msg) logging.warning(f"Нет ответа для команды: {cmd}") return False, None msg = f"[ERROR] Команда не выполнена корректно: {cmd}\n" if log_callback: log_callback(msg) logging.error(msg) return False, None # Класс для терминального виджета с расширенной функциональностью class TerminalWidget(CustomText): def __init__(self, master, *args, **kwargs): super().__init__(master, *args, **kwargs) # Настройка цветов для разных типов сообщений self.tag_configure("error", foreground="red") self.tag_configure("warning", foreground="orange") self.tag_configure("info", foreground="blue") self.tag_configure("command", foreground="green") self.tag_configure("separator", foreground="gray") # Добавляем скроллбар self.scrollbar = ttk.Scrollbar(self, command=self.yview) self.scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.config(yscrollcommand=self.scrollbar.set) # Настройка отступов и переносов self.config( wrap=tk.WORD, padx=5, pady=5, spacing1=2, # Отступ перед абзацем spacing2=2, # Межстрочный интервал spacing3=2 # Отступ после абзаца ) # Счетчик команд для разделителей self.command_counter = 0 def append_text(self, text, message_type=None): """ Добавление текста с определенным типом сообщения message_type может быть: 'error', 'warning', 'info', 'command' """ # Добавляем текст if not text.endswith('\n'): text += '\n' start_index = self.index(tk.END) self.insert(tk.END, text) end_index = self.index(tk.END) # Применяем тег в зависимости от типа сообщения if message_type: self.tag_add(message_type, start_index, end_index) # Автоматическая прокрутка к концу self.see(tk.END) # Обновляем виджет self.update_idletasks() def append_error(self, text): """Добавление сообщения об ошибке""" self.append_text(text, "error") def append_warning(self, text): """Добавление предупреждения""" self.append_text(text, "warning") def append_info(self, text): """Добавление информационного сообщения""" self.append_text(text, "info") def append_command(self, text): """Добавление команды с разделителем""" # Добавляем разделитель между командами if self.command_counter > 0: self.insert(tk.END, "\n" + "─" * 80 + "\n", "separator") self.command_counter += 1 # Добавляем команду self.append_text(text, "command") def clear(self): """Очистка терминала""" self.delete("1.0", tk.END) self.command_counter = 0 # Основной класс для графического интерфейса class SerialAppGUI(tk.Tk): def __init__(self, settings): super().__init__() self.title("ComConfigCopy") self.geometry("900x700") # Добавляем VERSION как атрибут класса self.VERSION = VERSION # Добавляем атрибуты для управления выполнением файла self.execution_thread = None self.execution_paused = False self.execution_stop = False # Добавляем атрибуты для таймера и прогресса self.start_time = 0 self.elapsed_time = 0 self.timer_running = False # Добавляем атрибут для отслеживания состояния COM-порта self.port_monitor_thread = None self.port_monitoring = False # Инициализация проверки обновлений self.update_checker = UpdateChecker( VERSION, "https://gitea.filow.ru/LowaSC/ComConfigCopy", include_prereleases=False # Проверяем только стабильные версии ) # Настройка стиля self.style = ttk.Style(self) self.style.theme_use("clam") default_font = ("Segoe UI", 10) self.option_add("*Font", default_font) self.settings = settings self.connection = None self.tftp_server = None self.create_menu() self.create_tabs() # Добавляем статус бар self.create_status_bar() # Обновляем информацию в статус баре self.update_status_bar() # Проверка первого запуска self.check_first_run() # Создание меню def create_menu(self): menubar = tk.Menu(self) self.config(menu=menubar) # Меню "Файл" file_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Файл", menu=file_menu) file_menu.add_command(label="Настройки", command=self.open_settings) file_menu.add_separator() file_menu.add_command(label="Выход", command=self.quit) # Меню "Справка" help_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Справка", menu=help_menu) help_menu.add_command(label="Проверить обновления", command=self.check_for_updates) help_menu.add_separator() help_menu.add_command(label="О программе", command=self.open_about) # Проверка наличия обновлений def check_for_updates(self): def on_update_check(update_available, error): if error: messagebox.showerror( "Ошибка проверки обновлений", f"Не удалось проверить наличие обновлений:\n{error}" ) elif update_available: release_info = self.update_checker.get_release_notes() if release_info: # Форматируем сообщение message = ( f"Доступна новая версия {release_info['version']}!\n\n" f"Тип релиза: {'Пре-релиз' if release_info['type'] == 'prerelease' else 'Стабильный'}\n\n" "Изменения:\n" f"{release_info['description']}\n\n" "Хотите перейти на страницу загрузки?" ) response = messagebox.askyesno( "Доступно обновление", message, ) if response: webbrowser.open(release_info["link"]) else: messagebox.showerror( "Ошибка", "Не удалось получить информацию о новой версии" ) else: messagebox.showinfo( "Проверка обновлений", "У вас установлена последняя версия программы" ) self.update_checker.check_updates(callback=on_update_check) # Обработчик изменения настроек def on_settings_changed(self): self.settings = settings_load() self.update_status_bar() # Добавляем обновление статус бара # Открытие окна настроек def open_settings(self): settings_window = SettingsWindow(self, self.settings, self.on_settings_changed) settings_window.transient(self) settings_window.grab_set() # Проверка первого запуска программы def check_first_run(self): show_settings = False # Проверяем существование файла настроек if not os.path.exists(SETTINGS_FILE): show_settings = True else: # Проверяем содержимое файла настроек try: with open(SETTINGS_FILE, "r", encoding="utf-8") as f: settings = json.load(f) # Если порт не настроен, считаем это первым запуском if settings.get("port") is None: show_settings = True except Exception: # Если файл поврежден или не читается, тоже показываем настройки show_settings = True if show_settings: # Создаем папку Settings, если её нет os.makedirs("Settings", exist_ok=True) response = messagebox.askyesno( "Первый запуск", "Это первый запуск программы. Хотите настроить параметры подключения сейчас?" ) if response: self.open_settings() # Создание вкладок def create_tabs(self): self.notebook = ttk.Notebook(self) self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5) # Создаем вкладки interactive_frame = ttk.Frame(self.notebook) file_exec_frame = ttk.Frame(self.notebook) config_editor_frame = ttk.Frame(self.notebook) tftp_frame = ttk.Frame(self.notebook) self.notebook.add(interactive_frame, text="Интерактивный режим") self.notebook.add(file_exec_frame, text="Выполнение файла") self.notebook.add(config_editor_frame, text="Редактор конфигурации") self.notebook.add(tftp_frame, text="TFTP Сервер") self.create_interactive_tab(interactive_frame) self.create_file_exec_tab(file_exec_frame) self.create_config_editor_tab(config_editor_frame) self.create_tftp_tab(tftp_frame) # Создание вкладки "Интерактивный режим" def create_interactive_tab(self, frame): control_frame = ttk.Frame(frame) control_frame.pack(fill=X, pady=5) # Кнопка подключения с иконкой connect_btn = ttk.Button( control_frame, text="⚡ Подключиться", # Unicode символ для "молнии" command=self.connect_device ) connect_btn.pack(side=LEFT, padx=5) # Кнопка отключения с иконкой disconnect_btn = ttk.Button( control_frame, text="✕ Отключиться", # Unicode символ для "крестика" command=self.disconnect_device ) disconnect_btn.pack(side=LEFT, padx=5) # Используем новый TerminalWidget вместо CustomText self.interactive_text = TerminalWidget(frame, height=20) self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5) input_frame = ttk.Frame(frame) input_frame.pack(fill=X, pady=5) ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5) self.command_entry = CustomEntry(input_frame, width=50) self.command_entry.pack(side=LEFT, padx=5) # Привязываем обработчик нажатия Enter к полю ввода self.command_entry.bind('', lambda event: self.send_command()) # Кнопка отправки с иконкой send_btn = ttk.Button( input_frame, text="➤ Отправить", # Unicode символ для "стрелки" command=self.send_command ) send_btn.pack(side=LEFT, padx=5) # Подключение к устройству def connect_device(self): if self.connection: messagebox.showinfo("Информация", "Уже подключено.") return if not self.settings.get("port"): messagebox.showerror("Ошибка", "COM-порт не выбран!") return self.connection = create_connection(self.settings) if self.connection: self.interactive_text.append_info("[INFO] Подключение установлено.") # Запускаем мониторинг состояния порта self.start_port_monitoring() self.update_status_bar() else: self.interactive_text.append_error("[ERROR] Не удалось установить соединение.") self.update_status_bar() # Отключение от устройства def disconnect_device(self): if self.connection: try: # Останавливаем мониторинг перед закрытием соединения self.stop_port_monitoring() self.connection.close() except Exception: pass finally: self.connection = None self.interactive_text.append_info("[INFO] Соединение закрыто.") self.update_status_bar() else: messagebox.showinfo("Информация", "Соединение не установлено.") def start_port_monitoring(self): """Запуск мониторинга состояния COM-порта""" if not self.port_monitor_thread or not self.port_monitor_thread.is_alive(): self.port_monitoring = True self.port_monitor_thread = threading.Thread(target=self.monitor_port_state, daemon=True) self.port_monitor_thread.start() def stop_port_monitoring(self): """Остановка мониторинга состояния COM-порта""" self.port_monitoring = False if self.port_monitor_thread and self.port_monitor_thread.is_alive(): self.port_monitor_thread.join(timeout=1.0) def monitor_port_state(self): """Мониторинг состояния COM-порта""" while self.port_monitoring and self.connection: try: # Проверяем, что порт всё ещё открыт и отвечает if not self.connection.is_open: self.connection.open() # Если порт отвечает, обновляем статус self.after(0, self.update_connection_indicator, True) except (SerialException, OSError): # Если возникла ошибка, помечаем порт как отключенный self.after(0, self.update_connection_indicator, False) # Пытаемся переподключиться try: if self.connection: self.connection.close() except: pass self.connection = None break time.sleep(1) # Проверяем состояние каждую секунду def update_connection_indicator(self, is_connected): """Обновление индикатора подключения COM-порта""" if is_connected: self.connection_indicator.configure(text="⬤", foreground='green') else: self.connection_indicator.configure(text="⬤", foreground='red') # Отправка команды def send_command(self): if not self.connection: messagebox.showerror("Ошибка", "Сначала установите соединение!") return cmd = self.command_entry.get().strip() if not cmd: return self.interactive_text.append_command(f"[CMD] {cmd}") self.command_entry.delete(0, END) # Очищаем поле ввода threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start() # Обработка команды def process_command(self, cmd): try: success, response = send_command_and_process_response( self.connection, cmd, self.settings.get("timeout", 10), max_attempts=3, log_callback=self.append_interactive_text, login=self.settings.get("login"), password=self.settings.get("password"), is_gui=True ) except SerialException as e: self.interactive_text.append_error(f"[ERROR] Ошибка при отправке команды: {e}") logging.error(f"Ошибка отправки команды: {e}", exc_info=True) except Exception as e: self.interactive_text.append_error(f"[ERROR] Неизвестная ошибка: {e}") logging.error(f"Неизвестная ошибка: {e}", exc_info=True) # Добавление текста в текстовое поле def append_interactive_text(self, text): if "[ERROR]" in text: self.interactive_text.append_error(text) elif "[WARNING]" in text or "[WARN]" in text: self.interactive_text.append_warning(text) elif "[INFO]" in text: self.interactive_text.append_info(text) else: self.interactive_text.append_text(text) # Создание вкладки "Выполнить команды из файла" def create_file_exec_tab(self, frame): file_frame = ttk.Frame(frame) file_frame.pack(fill=X, pady=5) ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5) self.file_exec_var = StringVar(value=self.settings.get("config_file") or "") CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5) ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5) # Создаем фрейм для кнопок управления control_frame = ttk.Frame(frame) control_frame.pack(pady=5) # Кнопки управления выполнением self.start_button = ttk.Button(control_frame, text="▶ Старт", command=self.start_execution) self.start_button.pack(side=LEFT, padx=5) self.pause_button = ttk.Button(control_frame, text="⏸ Пауза", command=self.pause_execution, state="disabled") self.pause_button.pack(side=LEFT, padx=5) self.stop_button = ttk.Button(control_frame, text="⏹ Остановить", command=self.stop_execution, state="disabled") self.stop_button.pack(side=LEFT, padx=5) # Создаем фрейм для индикатора прогресса и таймера progress_frame = ttk.Frame(frame) progress_frame.pack(fill=X, pady=5, padx=5) # Добавляем индикатор прогресса progress_label_frame = ttk.Frame(progress_frame) progress_label_frame.pack(fill=X) self.progress_label = ttk.Label(progress_label_frame, text="Прогресс: 0/0 команд") self.progress_label.pack(side=LEFT, padx=5) self.timer_label = ttk.Label(progress_label_frame, text="Время: 00:00:00") self.timer_label.pack(side=RIGHT, padx=5) self.progress_bar = ttk.Progressbar(progress_frame, mode='determinate') self.progress_bar.pack(fill=X, pady=5) # Используем новый TerminalWidget вместо CustomText self.file_exec_text = TerminalWidget(frame, height=15) self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5) def append_file_exec_text(self, text): if "[ERROR]" in text: self.file_exec_text.append_error(text) elif "[WARNING]" in text or "[WARN]" in text: self.file_exec_text.append_warning(text) elif "[INFO]" in text: self.file_exec_text.append_info(text) elif "[CMD]" in text: # Добавляем обработку команд self.file_exec_text.append_command(text) else: self.file_exec_text.append_text(text) # Выбор файла конфигурации для выполнения команд def select_config_file_fileexec(self): select_config_file(self, self.file_exec_var) # Выполнение команд из файла def execute_file_commands(self): if not self.settings.get("port"): messagebox.showerror("Ошибка", "COM-порт не выбран!") return if not self.file_exec_var.get(): messagebox.showerror("Ошибка", "Файл конфигурации не выбран!") return if not self.connection: self.connection = create_connection(self.settings) if self.connection: # Запускаем мониторинг при новом подключении self.start_port_monitoring() if not self.connection: self.append_file_exec_text("[ERROR] Не удалось установить соединение.\n") return try: with open(self.file_exec_var.get(), "r", encoding="utf-8") as file: self.commands = [line.strip() for line in file if line.strip()] self.current_command_index = 0 self.execution_paused = False self.execution_stop = False # Инициализируем прогресс бар self.progress_bar['value'] = 0 self.update_progress() # Запускаем таймер self.start_time = time.time() self.elapsed_time = 0 self.timer_running = True self.update_timer() # Запускаем выполнение команд в отдельном потоке threading.Thread(target=self.command_execution_thread, daemon=True).start() except Exception as e: self.append_file_exec_text(f"[ERROR] Ошибка при чтении файла: {str(e)}\n") self.reset_execution_buttons() def command_execution_thread(self): """Отдельный поток для выполнения команд""" copy_mode = self.settings.get("copy_mode", "line") block_size = self.settings.get("block_size", 15) # Счетчик последовательных ошибок consecutive_errors = 0 MAX_CONSECUTIVE_ERRORS = 3 # Максимальное количество последовательных ошибок def check_connection(): """Проверка состояния соединения""" if not self.connection or not self.connection.is_open: self.append_file_exec_text("[ERROR] Соединение потеряно!\n") # Автоматически ставим на паузу self.execution_paused = True self.after(0, lambda: self.pause_button.config(text="▶ Продолжить")) # Показываем сообщение пользователю self.after(0, lambda: messagebox.showerror( "Ошибка соединения", "Соединение с устройством потеряно!\nВыполнение команд приостановлено.\n\n" "Пожалуйста:\n" "1. Проверьте подключение\n" "2. Нажмите 'Продолжить' после восстановления соединения\n" " или 'Остановить' для прекращения выполнения" )) return False return True def handle_no_response(cmd_or_block, is_block=False): """Обработка отсутствия ответа от устройства""" nonlocal consecutive_errors consecutive_errors += 1 if consecutive_errors >= MAX_CONSECUTIVE_ERRORS: self.append_file_exec_text( f"[ERROR] Обнаружено {consecutive_errors} последовательных ошибок!\n" "Возможно, устройство не отвечает или проблемы с соединением.\n" ) # Автоматически ставим на паузу self.execution_paused = True self.after(0, lambda: self.pause_button.config(text="▶ Продолжить")) # Показываем сообщение пользователю self.after(0, lambda: messagebox.showerror( "Устройство не отвечает", f"Обнаружено {consecutive_errors} последовательных ошибок!\n\n" "Возможные причины:\n" "1. Устройство не отвечает на команды\n" "2. Проблемы с соединением\n" "3. Неверный формат команд\n\n" "Выполнение приостановлено.\n" "Проверьте подключение и состояние устройства,\n" "затем нажмите 'Продолжить' или 'Остановить'." )) return False return True if copy_mode == "line": # Построчный режим while self.current_command_index < len(self.commands): if self.execution_stop: break if self.execution_paused: time.sleep(0.1) continue # Проверяем соединение перед каждой командой if not check_connection(): continue cmd = self.commands[self.current_command_index] try: success, response = send_command_and_process_response( self.connection, cmd, self.settings.get("timeout", 10), max_attempts=3, log_callback=self.append_file_exec_text, login=self.settings.get("login"), password=self.settings.get("password"), is_gui=True ) if not success: self.append_file_exec_text(f"[ERROR] Не удалось выполнить команду: {cmd}\n") # Проверяем соединение после неудачной попытки if not check_connection(): continue # Обрабатываем отсутствие ответа if not handle_no_response(cmd): continue else: # Сбрасываем счетчик ошибок при успешном выполнении consecutive_errors = 0 self.current_command_index += 1 self.after(0, self.update_progress) time.sleep(1) # Задержка между командами except Exception as e: self.append_file_exec_text(f"[ERROR] Ошибка при выполнении команды: {str(e)}\n") if not check_connection(): continue break else: # Блочный режим blocks = generate_command_blocks(self.commands, block_size) total_blocks = len(blocks) current_block = 0 while current_block < total_blocks: if self.execution_stop: break if self.execution_paused: time.sleep(0.1) continue # Проверяем соединение перед каждым блоком if not check_connection(): continue block = blocks[current_block] try: # Выводим блок команд без [CMD] префикса self.append_file_exec_text(f"Выполнение блока команд:\n{block}\n") success, response = send_command_and_process_response( self.connection, block, self.settings.get("timeout", 10), max_attempts=3, log_callback=None, # Отключаем вывод для первой попытки login=self.settings.get("login"), password=self.settings.get("password"), is_gui=True ) if not success or (response and '^' in response): self.append_file_exec_text("[WARNING] Ошибка при выполнении блока команд. Отправляю команды по отдельности...\n") # Проверяем соединение перед отправкой отдельных команд if not check_connection(): continue # Обрабатываем отсутствие ответа для блока if not success and not handle_no_response(block, True): continue # Отправляем команды блока по отдельности for cmd in block.splitlines(): if self.execution_stop: break if cmd.strip(): if not check_connection(): break success, resp = send_command_and_process_response( self.connection, cmd, self.settings.get("timeout", 10), max_attempts=3, log_callback=self.append_file_exec_text, login=self.settings.get("login"), password=self.settings.get("password"), is_gui=True ) if not success: self.append_file_exec_text(f"[ERROR] Не удалось выполнить команду: {cmd}\n") if not check_connection(): break # Обрабатываем отсутствие ответа для отдельной команды if not handle_no_response(cmd): break else: # Сбрасываем счетчик ошибок при успешном выполнении consecutive_errors = 0 else: # Если блок выполнился успешно, выводим ответ и сбрасываем счетчик ошибок consecutive_errors = 0 if response: self.append_file_exec_text(f"Ответ устройства:\n{response}\n") # Обновляем прогресс на основе количества выполненных блоков current_block += 1 self.current_command_index = (current_block * 100) // total_blocks self.after(0, self.update_progress) time.sleep(1) except Exception as e: self.append_file_exec_text(f"[ERROR] Ошибка при выполнении блока команд: {str(e)}\n") if not check_connection(): continue break # Завершение выполнения self.after(0, self.execution_completed) def execution_completed(self): """Обработка завершения выполнения в главном потоке""" if self.execution_stop: self.append_file_exec_text("[INFO] Выполнение остановлено.\n") else: self.append_file_exec_text("[INFO] Выполнение завершено.\n") self.reset_execution_buttons() def start_execution(self): # Проверяем подключение перед стартом if not self.connection or not self.connection.is_open: self.connection = create_connection(self.settings) if self.connection: # Запускаем мониторинг при новом подключении self.start_port_monitoring() else: self.append_file_exec_text("[ERROR] Не удалось установить соединение.\n") return if not hasattr(self, 'commands') or not self.commands: self.execution_thread = threading.Thread(target=self.execute_file_commands, daemon=True) self.execution_thread.start() else: # Если команды уже загружены, просто возобновляем выполнение self.execution_paused = False self.execution_stop = False self.timer_running = True threading.Thread(target=self.command_execution_thread, daemon=True).start() self.start_button.config(state="disabled") self.pause_button.config(state="normal") self.stop_button.config(state="normal") self.update_status_bar() # Обновляем статус бар после подключения def pause_execution(self): if not self.execution_paused: self.execution_paused = True self.pause_button.config(text="▶ Продолжить") self.append_file_exec_text("[INFO] Выполнение приостановлено.\n") else: self.execution_paused = False self.pause_button.config(text="⏸ Пауза") self.append_file_exec_text("[INFO] Выполнение возобновлено.\n") def stop_execution(self): self.execution_stop = True self.execution_paused = False self.timer_running = False # Отключаемся от COM-порта if self.connection: try: self.stop_port_monitoring() self.connection.close() self.connection = None self.append_file_exec_text("[INFO] Соединение закрыто.\n") except Exception as e: self.append_file_exec_text(f"[ERROR] Ошибка при закрытии соединения: {str(e)}\n") self.current_command_index = 0 # Сбрасываем индекс текущей команды self.progress_bar['value'] = 0 # Сбрасываем прогресс-бар self.update_progress() # Обновляем отображение прогресса self.append_file_exec_text("[INFO] Выполнение остановлено.\n") self.reset_execution_buttons() # Сбрасываем состояние кнопок self.update_status_bar() # Обновляем статус бар def reset_execution_buttons(self): self.start_button.config(state="normal") self.pause_button.config(state="disabled", text="⏸ Пауза") self.stop_button.config(state="disabled") self.execution_paused = False self.execution_stop = False self.timer_running = False # Очищаем список команд, чтобы при следующем старте они загрузились заново if hasattr(self, 'commands'): delattr(self, 'commands') def update_timer(self): """Обновление таймера в главном потоке""" if self.timer_running: if not self.execution_paused: self.elapsed_time = time.time() - self.start_time hours = int(self.elapsed_time // 3600) minutes = int((self.elapsed_time % 3600) // 60) seconds = int(self.elapsed_time % 60) self.timer_label.config(text=f"Время: {hours:02d}:{minutes:02d}:{seconds:02d}") self.after(1000, self.update_timer) # Обновление каждую секунду def update_progress(self): """Обновление прогресса в главном потоке""" if hasattr(self, 'commands'): total_commands = len(self.commands) current = self.current_command_index self.progress_label.config(text=f"Прогресс: {current}/{total_commands} команд") if total_commands > 0: progress = (current / total_commands) * 100 self.progress_bar['value'] = progress # Создание вкладки "Редактор конфигурационного файла" def create_config_editor_tab(self, frame): top_frame = ttk.Frame(frame) top_frame.pack(fill=X, pady=5) ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5) self.editor_file_var = StringVar(value=self.settings.get("config_file") or "") CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5) self.config_editor_text = CustomText(frame, wrap="word") self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5) # Выбор файла конфигурации для редактирования def select_config_file_editor(self): select_config_file(self, self.editor_file_var, save_to_settings=True) # Загрузка файла конфигурации def load_config_file(self): filename = self.editor_file_var.get() if not filename or not os.path.exists(filename): messagebox.showerror("Ошибка", "Файл конфигурации не выбран или не существует.") return try: with open(filename, "r", encoding="utf-8") as f: content = f.read() self.config_editor_text.delete("1.0", END) self.config_editor_text.insert(END, content) messagebox.showinfo("Информация", "Файл загружен.") except Exception as e: logging.error(f"Ошибка загрузки файла: {e}", exc_info=True) messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}") # Сохранение файла конфигурации def save_config_file(self): filename = self.editor_file_var.get() if not filename: messagebox.showerror("Ошибка", "Файл конфигурации не выбран.") return try: content = self.config_editor_text.get("1.0", END) with open(filename, "w", encoding="utf-8") as f: f.write(content) messagebox.showinfo("Информация", "Файл сохранён.") except Exception as e: logging.error(f"Ошибка сохранения файла: {e}", exc_info=True) messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}") # Открытие окна "О программе" def open_about(self): about_window = AboutWindow(self) about_window.transient(self) about_window.grab_set() # Создание вкладки TFTP сервера def create_tftp_tab(self, frame): # Создаем фрейм для управления TFTP сервером tftp_frame = ttk.Frame(frame) tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) # Создаем и размещаем элементы управления controls_frame = ttk.LabelFrame(tftp_frame, text="Управление TFTP сервером") controls_frame.pack(fill=X, padx=5, pady=5) # IP адрес ip_frame = ttk.Frame(controls_frame) ip_frame.pack(fill=X, padx=5, pady=2) ttk.Label(ip_frame, text="IP адрес:").pack(side=LEFT, padx=5) self.tftp_ip_var = StringVar(value="0.0.0.0") self.tftp_ip_combo = ttk.Combobox(ip_frame, textvariable=self.tftp_ip_var, state="readonly") self.tftp_ip_combo.pack(side=LEFT, fill=X, expand=True, padx=5) ttk.Button(ip_frame, text="Обновить", command=self.update_network_adapters).pack(side=LEFT, padx=5) # Заполняем список адаптеров self.update_network_adapters() # Порт port_frame = ttk.Frame(controls_frame) port_frame.pack(fill=X, padx=5, pady=2) ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5) self.tftp_port_var = StringVar(value="69") self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var) self.tftp_port_entry.pack(fill=X, expand=True, padx=5) # Кнопки управления buttons_frame = ttk.Frame(controls_frame) buttons_frame.pack(fill=X, padx=5, pady=5) self.start_tftp_button = ttk.Button( buttons_frame, text="Запустить сервер", command=self.start_tftp_server ) self.start_tftp_button.pack(side=LEFT, padx=5) self.stop_tftp_button = ttk.Button( buttons_frame, text="Остановить сервер", command=self.stop_tftp_server, state="disabled" ) self.stop_tftp_button.pack(side=LEFT, padx=5) # Лог сервера log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера") log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) # Используем новый TerminalWidget вместо CustomText self.tftp_log_text = TerminalWidget(log_frame, height=10) self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5) # Статус передач transfers_frame = ttk.LabelFrame(tftp_frame, text="Активные передачи") transfers_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) # Создаем таблицу для отображения активных передач columns = ("client", "filename", "progress", "remaining", "time") self.transfers_tree = ttk.Treeview(transfers_frame, columns=columns, show="headings") # Настраиваем заголовки колонок self.transfers_tree.heading("client", text="Клиент") self.transfers_tree.heading("filename", text="Файл") self.transfers_tree.heading("progress", text="Прогресс") self.transfers_tree.heading("remaining", text="Осталось") self.transfers_tree.heading("time", text="Время") # Настраиваем ширину колонок self.transfers_tree.column("client", width=120) self.transfers_tree.column("filename", width=150) self.transfers_tree.column("progress", width=100) self.transfers_tree.column("remaining", width=100) self.transfers_tree.column("time", width=80) self.transfers_tree.pack(fill=BOTH, expand=True, padx=5, pady=5) # Инициализация TFTP сервера self.tftp_server = None self.tftp_server_thread = None # Запуск TFTP сервера def start_tftp_server(self): try: # Получаем выбранный IP-адрес ip = self.tftp_ip_var.get() if not ip: messagebox.showerror("Ошибка", "Выберите IP-адрес для TFTP сервера") return # Проверяем корректность порта try: port = int(self.tftp_port_var.get()) if port <= 0 or port > 65535: raise ValueError("Порт должен быть в диапазоне 1-65535") except ValueError as e: messagebox.showerror("Ошибка", f"Некорректный порт: {str(e)}") return # Создаем экземпляр TFTP сервера self.tftp_server = TFTPServer("Firmware") # Устанавливаем callback для логирования def log_callback(message): # Фильтруем дублирующиеся сообщения о запуске/остановке сервера if "[INFO] TFTP сервер запущен" in message and hasattr(self, '_server_started'): return if "[INFO] TFTP сервер остановлен" in message and hasattr(self, '_server_stopped'): return self.append_tftp_log(message) # Устанавливаем флаги для отслеживания состояния if "[INFO] TFTP сервер запущен" in message: self._server_started = True elif "[INFO] TFTP сервер остановлен" in message: self._server_stopped = True # Обновляем информацию о передачах self.update_transfers_info() self.tftp_server.set_log_callback(log_callback) # Запускаем сервер в отдельном потоке self.tftp_server_thread = threading.Thread( target=self.run_tftp_server, args=(ip, port), daemon=True ) self.tftp_server_thread.start() # Обновляем состояние кнопок и элементов управления self.start_tftp_button.config(state="disabled") self.stop_tftp_button.config(state="normal") self.tftp_ip_combo.config(state="disabled") self.tftp_port_entry.config(state="disabled") # Запускаем периодическое обновление информации о передачах self.update_transfers_periodically() # Обновляем статус бар после запуска сервера self.update_status_bar() except Exception as e: self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}") messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}") # Запуск TFTP сервера в отдельном потоке def run_tftp_server(self, ip, port): try: self.tftp_server.start_server(ip, port) except Exception as e: self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}") # Остановка TFTP сервера def stop_tftp_server(self): if self.tftp_server: try: # Отключаем кнопки на время остановки сервера self.start_tftp_button.config(state="disabled") self.stop_tftp_button.config(state="disabled") # Сбрасываем флаги состояния if hasattr(self, '_server_started'): delattr(self, '_server_started') if hasattr(self, '_server_stopped'): delattr(self, '_server_stopped') # Останавливаем сервер self.tftp_server.stop_server() # Ждем завершения потока сервера с таймаутом if self.tftp_server_thread: self.tftp_server_thread.join(timeout=5.0) if self.tftp_server_thread.is_alive(): self.append_tftp_log("[WARN] Превышено время ожидания остановки сервера") # Очищаем ссылки на сервер и поток self.tftp_server = None self.tftp_server_thread = None # Обновляем состояние кнопок self.start_tftp_button.config(state="normal") self.stop_tftp_button.config(state="disabled") self.tftp_ip_combo.config(state="normal") self.tftp_port_entry.config(state="normal") # Очищаем таблицу передач for item in self.transfers_tree.get_children(): self.transfers_tree.delete(item) # Обновляем статус бар после остановки сервера self.update_status_bar() except Exception as e: self.append_tftp_log(f"[ERROR] Ошибка остановки сервера: {str(e)}") messagebox.showerror("Ошибка", f"Не удалось остановить TFTP сервер: {str(e)}") # Восстанавливаем состояние кнопок в случае ошибки self.start_tftp_button.config(state="disabled") self.stop_tftp_button.config(state="normal") def append_tftp_log(self, text): if "[ERROR]" in text: self.tftp_log_text.append_error(text) elif "[WARNING]" in text or "[WARN]" in text: self.tftp_log_text.append_warning(text) elif "[INFO]" in text: self.tftp_log_text.append_info(text) else: self.tftp_log_text.append_text(text) # Обновление информации об активных передачах def update_transfers_info(self): if not self.tftp_server: return # Очищаем текущие записи for item in self.transfers_tree.get_children(): self.transfers_tree.delete(item) # Добавляем информацию о текущих передачах for client_addr, transfer_info in self.tftp_server.active_transfers.items(): filename = transfer_info['filename'] bytes_sent = transfer_info['bytes_sent'] filesize = transfer_info['filesize'] start_time = transfer_info['start_time'] # Вычисляем прогресс progress = f"{bytes_sent}/{filesize} байт" remaining_bytes = filesize - bytes_sent elapsed_time = time.time() - start_time # Вычисляем скорость передачи (байт/сек) if elapsed_time > 0: transfer_speed = bytes_sent / elapsed_time # Вычисляем оставшееся время if transfer_speed > 0: remaining_time = remaining_bytes / transfer_speed remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)" else: remaining_str = f"{remaining_bytes} байт (неизвестно)" else: remaining_str = f"{remaining_bytes} байт (вычисляется...)" # Добавляем запись в таблицу self.transfers_tree.insert("", END, values=( f"{client_addr[0]}:{client_addr[1]}", filename, progress, remaining_str, f"{elapsed_time:.1f}с" )) # Периодическое обновление информации о передачах def update_transfers_periodically(self): if self.tftp_server and self.tftp_server.running: self.update_transfers_info() # Планируем следующее обновление через 1 секунду self.after(1000, self.update_transfers_periodically) # Обновление списка сетевых адаптеров def update_network_adapters(self): adapters = get_network_adapters() self.tftp_ip_combo["values"] = adapters if not self.tftp_ip_var.get() in adapters: self.tftp_ip_var.set(adapters[0]) # Добавляем новый метод для создания статус бара def create_status_bar(self): self.status_bar = ttk.Frame(self) self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) # Создаем фрейм для индикаторов indicators_frame = ttk.Frame(self.status_bar) indicators_frame.pack(side=tk.LEFT) # Индикатор подключения к коммутатору self.connection_indicator_frame = ttk.Frame(indicators_frame) self.connection_indicator_frame.pack(side=tk.LEFT) ttk.Label(self.connection_indicator_frame, text="COM:", padding=(5, 2)).pack(side=tk.LEFT) self.connection_indicator = ttk.Label( self.connection_indicator_frame, text="⬤", font=("Segoe UI", 10), width=2, anchor='center', padding=(2, 2) ) self.connection_indicator.pack(side=tk.LEFT) # Индикатор TFTP сервера self.tftp_indicator_frame = ttk.Frame(indicators_frame) self.tftp_indicator_frame.pack(side=tk.LEFT, padx=10) ttk.Label(self.tftp_indicator_frame, text="TFTP:", padding=(5, 2)).pack(side=tk.LEFT) self.tftp_indicator = ttk.Label( self.tftp_indicator_frame, text="⬤", font=("Segoe UI", 10), width=2, anchor='center', padding=(2, 2) ) self.tftp_indicator.pack(side=tk.LEFT) ttk.Separator(self.status_bar, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) # Остальные элементы статус бара... self.port_label = ttk.Label(self.status_bar, text="", padding=(5, 2)) self.port_label.pack(side=tk.LEFT) ttk.Separator(self.status_bar, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) self.baudrate_label = ttk.Label(self.status_bar, text="", padding=(5, 2)) self.baudrate_label.pack(side=tk.LEFT) ttk.Separator(self.status_bar, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) self.copy_mode_label = ttk.Label(self.status_bar, text="", padding=(5, 2)) self.copy_mode_label.pack(side=tk.LEFT) ttk.Separator(self.status_bar, orient=tk.VERTICAL).pack(side=tk.LEFT, fill=tk.Y, padx=5) self.version_label = ttk.Label(self.status_bar, text=f"Версия: {VERSION}", padding=(5, 2)) self.version_label.pack(side=tk.RIGHT) # Обновляем метод update_status_bar def update_status_bar(self): """Обновление статус-бара""" # Проверяем реальное состояние подключения is_connected = bool(self.connection and self.connection.is_open) self.update_connection_indicator(is_connected) # Обновляем индикатор TFTP сервера if self.tftp_server and self.tftp_server.running: self.tftp_indicator.configure(text="⬤", foreground='green') else: self.tftp_indicator.configure(text="⬤", foreground='red') # Остальные обновления статус бара port = self.settings.get("port", "Не выбран") baudrate = self.settings.get("baudrate", "9600") copy_mode = "Построчный" if self.settings.get("copy_mode") == "line" else "Блочный" self.port_label.config(text=f"Порт: {port}") self.baudrate_label.config(text=f"Скорость: {baudrate}") self.copy_mode_label.config(text=f"Режим: {copy_mode}") # ========================== # Основной запуск приложения # ========================== def main(): setup_logging() settings = settings_load() app = SerialAppGUI(settings) app.mainloop() # ========================== # Основной запуск приложения # ========================== if __name__ == "__main__": try: main() except KeyboardInterrupt: logging.info("Программа прервана пользователем (KeyboardInterrupt).") sys.exit(0) except Exception as e: logging.critical(f"Неизвестная ошибка: {e}", exc_info=True) sys.exit(1)