#!/usr/bin/env python3 # -*- coding: utf-8 -*- # ------------------------------------------------------------ # Это программа для копирования конфигураций на коммутаторы # ------------------------------------------------------------ import json import logging import os import re import sys import threading import time import webbrowser from getpass import getpass from logging.handlers import RotatingFileHandler import tkinter as tk from tkinter import ( StringVar, END, BOTH, LEFT, X, W, filedialog, messagebox, simpledialog, ) from tkinter import ttk import serial import serial.tools.list_ports from serial.serialutil import SerialException from about_window import AboutWindow from TFTPServer import TFTPServer import socket from update_checker import UpdateChecker # Версия программы VERSION = "1.0.1" # Создаем необходимые папки os.makedirs("Logs", exist_ok=True) os.makedirs("Configs", exist_ok=True) os.makedirs("Settings", exist_ok=True) os.makedirs("Firmware", exist_ok=True) os.makedirs("docs", exist_ok=True) # Файл настроек находится в папке Settings SETTINGS_FILE = os.path.join("Settings", "settings.json") # Настройка логирования с использованием RotatingFileHandler. def setup_logging(): logger = logging.getLogger() logger.setLevel(logging.DEBUG) log_path = os.path.join("Logs", "app.log") handler = RotatingFileHandler( log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8" ) formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) # Загрузка настроек из JSON-файла или создание настроек по умолчанию. def settings_load(): default_settings = { "port": None, # Порт для подключения "baudrate": 9600, # Скорость передачи данных "config_file": None, # Файл конфигурации "login": None, # Логин для подключения "password": None, # Пароль для подключения "timeout": 10, # Таймаут подключения "copy_mode": "line", # Режим копирования "block_size": 15, # Размер блока команд "prompt": ">", # Используется для определения приглашения } # Создаем папку Settings, если её нет os.makedirs("Settings", exist_ok=True) if not os.path.exists(SETTINGS_FILE): try: # При первом запуске создаем новый файл настроек with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(default_settings, f, indent=4, ensure_ascii=False) logging.info("Файл настроек создан с настройками по умолчанию.") return default_settings except Exception as e: logging.error(f"Ошибка при создании файла настроек: {e}", exc_info=True) return default_settings try: # Пытаемся загрузить существующие настройки with open(SETTINGS_FILE, "r", encoding="utf-8") as f: settings = json.load(f) # Проверяем наличие всех необходимых параметров settings_changed = False for key, value in default_settings.items(): if key not in settings: settings[key] = value settings_changed = True # Если были добавлены новые параметры, сохраняем обновленные настройки if settings_changed: try: with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(settings, f, indent=4, ensure_ascii=False) logging.info("Файл настроек обновлен с новыми параметрами.") except Exception as e: logging.error(f"Ошибка при обновлении файла настроек: {e}", exc_info=True) return settings except Exception as e: logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True) return default_settings # Сохранение настроек в JSON-файл def settings_save(settings): try: with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(settings, f, indent=4, ensure_ascii=False) logging.info("Настройки сохранены в файл.") except Exception as e: logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True) # Получение списка доступных последовательных портов def list_serial_ports(): """Получение списка доступных последовательных портов.""" ports = serial.tools.list_ports.comports() logging.debug(f"Найдено {len(ports)} серийных портов.") return [port.device for port in ports] # Получение списка IP-адресов из сетевых адаптеров def get_network_adapters(): adapters = [] try: # Получаем имя хоста hostname = socket.gethostname() # Получаем все адреса для данного хоста addresses = socket.getaddrinfo(hostname, None) # Создаем множество для хранения уникальных IP-адресов unique_ips = set() for addr in addresses: ip = addr[4][0] # Пропускаем IPv6 и локальные адреса if ':' not in ip and not ip.startswith('127.'): unique_ips.add(ip) # Добавляем все найденные IP-адреса в список for ip in sorted(unique_ips): adapters.append(f"{ip}") # Добавляем 0.0.0.0 для прослушивания всех интерфейсов adapters.insert(0, "0.0.0.0") except Exception as e: logging.error(f"Ошибка при получении списка сетевых адаптеров: {e}", exc_info=True) # В случае ошибки возвращаем хотя бы 0.0.0.0 adapters = ["0.0.0.0"] return adapters # Создание соединения с устройством через последовательный порт def create_connection(settings): try: conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1) logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.") time.sleep(1) return conn except SerialException as e: logging.error(f"Ошибка подключения: {e}", exc_info=True) except Exception as e: logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True) return None # Проверка наличия логина и пароля в настройках и отправка их на устройство def send_login_password(serial_connection, login=None, password=None, is_gui=False): if not login: if is_gui: login = simpledialog.askstring("Login", "Введите логин:") if login is None: login = "" else: login = input("Введите логин: ") if not password: if is_gui: password = simpledialog.askstring("Password", "Введите пароль:", show="*") if password is None: password = "" else: password = getpass("Введите пароль: ") # Чтение ответа от устройства с учётом таймаута. def read_response(serial_connection, timeout, login=None, password=None, is_gui=False): response = b"" end_time = time.time() + timeout decoded = "" while time.time() < end_time: if serial_connection.in_waiting: chunk = serial_connection.read(serial_connection.in_waiting) response += chunk if b"--More--" in response: serial_connection.write(b"\n") response = response.replace(b"--More--", b"") try: decoded = response.decode(errors="ignore") except Exception: decoded = "" lines = decoded.rstrip().splitlines() if lines: last_line = lines[-1].strip() if re.search(r'(login:|username:)$', last_line, re.IGNORECASE): send_login_password(serial_connection, login, None, is_gui) response = b"" continue if re.search(r'(password:)$', last_line, re.IGNORECASE): send_login_password(serial_connection, None, password, is_gui) response = b"" continue if last_line.endswith(">") or last_line.endswith("#"): break else: time.sleep(0.1) return decoded # Генерация блоков команд для блочного копирования def generate_command_blocks(lines, block_size): blocks = [] current_block = [] for line in lines: trimmed = line.strip() if not trimmed: continue lower_line = trimmed.lower() if lower_line.startswith("vlan") or lower_line.startswith("enable") or lower_line.startswith("interface"): if current_block: blocks.append("\n".join(current_block)) current_block = [] blocks.append(trimmed) elif lower_line.startswith("exit"): current_block.append(trimmed) blocks.append("\n".join(current_block)) current_block = [] else: current_block.append(trimmed) if len(current_block) >= block_size: blocks.append("\n".join(current_block)) current_block = [] if current_block: blocks.append("\n".join(current_block)) return blocks # Выполнение команд из файла конфигурации def execute_commands_from_file( serial_connection, filename, timeout, copy_mode, block_size, log_callback=None, login=None, password=None, is_gui=False, ): try: with open(filename, "r", encoding="utf-8") as file: lines = [line for line in file if line.strip()] msg = f"Выполнение команд из файла: {filename}\n" logging.info(msg) if log_callback: log_callback(msg) # Если выбран построчный режим if copy_mode == "line": for cmd in lines: cmd = cmd.strip() max_attempts = 3 attempt = 0 while attempt < max_attempts: msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n" if log_callback: log_callback(msg) serial_connection.write((cmd + "\n").encode()) logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) if response: if '^' in response: msg = ( f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" f"Ответ устройства:\n{response}\n" f"Повторная отправка команды...\n" ) if log_callback: log_callback(msg) logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.") attempt += 1 time.sleep(1) continue else: msg = f"Ответ устройства:\n{response}\n" if log_callback: log_callback(msg) logging.info(f"Ответ устройства:\n{response}") break else: msg = f"Ответ не получен для команды: {cmd}\n" if log_callback: log_callback(msg) logging.warning(f"Нет ответа для команды: {cmd}") break if attempt == max_attempts: msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n" if log_callback: log_callback(msg) logging.error(msg) time.sleep(1) # Если выбран блочный режим elif copy_mode == "block": blocks = generate_command_blocks(lines, block_size) for block in blocks: msg = f"\nОтправка блока команд:\n{block}\n" if log_callback: log_callback(msg) serial_connection.write((block + "\n").encode()) logging.info(f"Отправлен блок команд:\n{block}") response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) # Если обнаружена ошибка в ответе на блок, отправляем команды по очереди if response and '^' in response: msg = ( f"[WARNING] Обнаружена ошибка при выполнении блока команд.\n" f"Ответ устройства:\n{response}\n" f"Пересылаются команды по отдельности...\n" ) if log_callback: log_callback(msg) logging.warning("Ошибка в блочном режиме – отправляем команды индивидуально.") for line in block.splitlines(): cmd = line.strip() if not cmd: continue max_attempts = 3 attempt = 0 while attempt < max_attempts: sub_msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n" if log_callback: log_callback(sub_msg) serial_connection.write((cmd + "\n").encode()) logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") sub_response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) if sub_response: if '^' in sub_response: sub_msg = ( f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" f"Ответ устройства:\n{sub_response}\n" f"Повторная отправка команды...\n" ) if log_callback: log_callback(sub_msg) logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.") attempt += 1 time.sleep(1) continue else: sub_msg = f"Ответ устройства:\n{sub_response}\n" if log_callback: log_callback(sub_msg) logging.info(f"Ответ устройства:\n{sub_response}") break else: sub_msg = f"Ответ не получен для команды: {cmd}\n" if log_callback: log_callback(sub_msg) logging.warning(f"Нет ответа для команды: {cmd}") break if attempt == max_attempts: sub_msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n" if log_callback: log_callback(sub_msg) logging.error(sub_msg) time.sleep(1) else: if response: msg = f"Ответ устройства:\n{response}\n" if log_callback: log_callback(msg) logging.info(f"Ответ устройства:\n{response}") else: msg = f"Ответ не получен для блока:\n{block}\n" if log_callback: log_callback(msg) logging.warning(f"Нет ответа для блока:\n{block}") time.sleep(1) except SerialException as e: msg = f"Ошибка при выполнении команды: {e}\n" if log_callback: log_callback(msg) logging.error(f"Ошибка при выполнении команды: {e}", exc_info=True) except Exception as e: msg = f"Ошибка при выполнении команд: {e}\n" if log_callback: log_callback(msg) logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True) # Базовый класс для кастомных виджетов с поддержкой контекстного меню и горячих клавиш class CustomWidgetBase: def __init__(self): self.create_context_menu() self.bind_shortcuts() # Создание контекстного меню def create_context_menu(self): """Создание контекстного меню""" self.context_menu = tk.Menu(self, tearoff=0) self.context_menu.add_command(label="Вырезать", command=self.cut) self.context_menu.add_command(label="Копировать", command=self.copy) self.context_menu.add_command(label="Вставить", command=self.paste) self.context_menu.add_separator() self.context_menu.add_command(label="Выделить всё", command=self.select_all) self.bind("", self.show_context_menu) # Привязка горячих клавиш def bind_shortcuts(self): """Привязка горячих клавиш""" # Стандартные сочетания self.bind("", lambda e: self.event_generate("<>")) self.bind("", lambda e: self.event_generate("<>")) self.bind("", lambda e: self.event_generate("<>")) self.bind("", self.select_all) # Дополнительные сочетания self.bind("", lambda e: self.event_generate("<>")) self.bind("", lambda e: self.event_generate("<>")) self.bind("", lambda e: self.event_generate("<>")) # Отображение контекстного меню def show_context_menu(self, event): """Отображение контекстного меню""" self.context_menu.post(event.x_root, event.y_root) # Вырезание текста def cut(self): self.event_generate("<>") # Копирование текста def copy(self): self.event_generate("<>") # Вставка текста def paste(self): self.event_generate("<>") # Класс для текстового поля с поддержкой контекстного меню и горячих клавиш class CustomText(tk.Text, CustomWidgetBase): def __init__(self, *args, **kwargs): tk.Text.__init__(self, *args, **kwargs) CustomWidgetBase.__init__(self) def select_all(self, event=None): """Выделение всего текста в Text виджете""" self.tag_add(tk.SEL, "1.0", tk.END) self.mark_set(tk.INSERT, "1.0") self.see(tk.INSERT) return "break" # Класс для поля ввода с поддержкой контекстного меню и горячих клавиш class CustomEntry(ttk.Entry, CustomWidgetBase): def __init__(self, *args, **kwargs): ttk.Entry.__init__(self, *args, **kwargs) CustomWidgetBase.__init__(self) def select_all(self, event=None): """Выделение всего текста в Entry виджете""" self.select_range(0, tk.END) return "break" # Класс для окна настроек class SettingsWindow(tk.Toplevel): def __init__(self, parent, settings, callback=None): super().__init__(parent) self.title("Настройки") self.geometry("600x400") self.settings = settings self.callback = callback self.resizable(False, False) # Создаем фрейм для настроек settings_frame = ttk.Frame(self, padding="10") settings_frame.pack(fill=BOTH, expand=True) # COM порт ttk.Label(settings_frame, text="COM порт:").grid(row=0, column=0, sticky=W, pady=5) self.port_var = StringVar(value=settings.get("port", "")) self.port_combo = ttk.Combobox(settings_frame, textvariable=self.port_var) self.port_combo.grid(row=0, column=1, sticky=W, pady=5) ttk.Button(settings_frame, text="Обновить", command=self.update_ports).grid(row=0, column=2, padx=5) # Скорость передачи ttk.Label(settings_frame, text="Скорость:").grid(row=1, column=0, sticky=W, pady=5) self.baudrate_var = StringVar(value=str(settings.get("baudrate", 9600))) baudrate_combo = ttk.Combobox(settings_frame, textvariable=self.baudrate_var, values=["9600", "19200", "38400", "57600", "115200"]) baudrate_combo.grid(row=1, column=1, sticky=W, pady=5) # Таймаут ttk.Label(settings_frame, text="Таймаут (сек):").grid(row=2, column=0, sticky=W, pady=5) self.timeout_var = StringVar(value=str(settings.get("timeout", 10))) timeout_entry = ttk.Entry(settings_frame, textvariable=self.timeout_var) timeout_entry.grid(row=2, column=1, sticky=W, pady=5) # Режим копирования ttk.Label(settings_frame, text="Режим копирования:").grid(row=3, column=0, sticky=W, pady=5) self.copy_mode_var = StringVar(value=settings.get("copy_mode", "line")) copy_mode_frame = ttk.Frame(settings_frame) copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5) ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line", variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT) ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block", variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT) # Размер блока self.block_size_label = ttk.Label(settings_frame, text="Размер блока:") self.block_size_label.grid(row=4, column=0, sticky=W, pady=5) self.block_size_var = StringVar(value=str(settings.get("block_size", 15))) self.block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var) self.block_size_entry.grid(row=4, column=1, sticky=W, pady=5) # Приглашение командной строки ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5) self.prompt_var = StringVar(value=settings.get("prompt", ">")) prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var) prompt_entry.grid(row=5, column=1, sticky=W, pady=5) # Кнопки button_frame = ttk.Frame(settings_frame) button_frame.grid(row=6, column=0, columnspan=3, pady=20) ttk.Button(button_frame, text="Сохранить", command=self.save_settings).pack(side=LEFT, padx=5) ttk.Button(button_frame, text="Отмена", command=self.destroy).pack(side=LEFT, padx=5) self.update_ports() # Инициализация видимости поля размера блока self.toggle_block_size() # Центрируем окно self.center_window() # Переключение видимости поля размера блока def toggle_block_size(self): if self.copy_mode_var.get() == "line": self.block_size_label.grid_remove() self.block_size_entry.grid_remove() else: self.block_size_label.grid() self.block_size_entry.grid() # Центрирование окна def center_window(self): self.update_idletasks() width = self.winfo_width() height = self.winfo_height() x = (self.winfo_screenwidth() // 2) - (width // 2) y = (self.winfo_screenheight() // 2) - (height // 2) self.geometry(f"{width}x{height}+{x}+{y}") # Обновление списка доступных последовательных портов def update_ports(self): ports = list_serial_ports() self.port_combo["values"] = ports if ports and not self.port_var.get(): self.port_var.set(ports[0]) # Сохранение настроек def save_settings(self): try: self.settings.update({ "port": self.port_var.get(), "baudrate": int(self.baudrate_var.get()), "timeout": int(self.timeout_var.get()), "copy_mode": self.copy_mode_var.get(), "block_size": int(self.block_size_var.get()), "prompt": self.prompt_var.get() }) settings_save(self.settings) if self.callback: self.callback() self.destroy() messagebox.showinfo("Успех", "Настройки успешно сохранены") except ValueError as e: messagebox.showerror("Ошибка", "Проверьте правильность введенных значений") # Общая функция для добавления текста в текстовое поле def append_text_to_widget(widget, text): # Проверяем, заканчивается ли текст символом новой строки if not text.endswith('\n'): text += '\n' widget.insert(END, text) widget.see(END) # Общая функция для выбора файла конфигурации def select_config_file(self, var, save_to_settings=False): filename = filedialog.askopenfilename( title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")] ) if filename: var.set(filename) if save_to_settings: self.settings["config_file"] = filename settings_save(self.settings) # Общая функция для отправки команды и обработки ответа def send_command_and_process_response( serial_connection, cmd, timeout, max_attempts=3, log_callback=None, login=None, password=None, is_gui=False ): attempt = 0 while attempt < max_attempts: msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n" if log_callback: log_callback(msg) serial_connection.write((cmd + "\n").encode()) logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) if response: if '^' in response: msg = ( f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" f"Ответ устройства:\n{response}\n" f"Повторная отправка команды...\n" ) if log_callback: log_callback(msg) logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.") attempt += 1 time.sleep(1) continue else: msg = f"Ответ устройства:\n{response}\n" if log_callback: log_callback(msg) logging.info(f"Ответ устройства:\n{response}") return True, response else: msg = f"Ответ не получен для команды: {cmd}\n" if log_callback: log_callback(msg) logging.warning(f"Нет ответа для команды: {cmd}") return False, None msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n" if log_callback: log_callback(msg) logging.error(msg) return False, None # Основной класс для графического интерфейса class SerialAppGUI(tk.Tk): def __init__(self, settings): super().__init__() self.title("ComConfigCopy") self.geometry("900x700") # Добавляем VERSION как атрибут класса self.VERSION = VERSION # Инициализация проверки обновлений self.update_checker = UpdateChecker( VERSION, "https://gitea.filow.ru/LowaSC/ComConfigCopy", include_prereleases=False # Проверяем только стабильные версии ) # Настройка стиля self.style = ttk.Style(self) self.style.theme_use("clam") default_font = ("Segoe UI", 10) self.option_add("*Font", default_font) self.settings = settings self.connection = None self.tftp_server = None self.create_menu() self.create_tabs() # Проверка первого запуска self.check_first_run() # Создание меню def create_menu(self): menubar = tk.Menu(self) self.config(menu=menubar) # Меню "Файл" file_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Файл", menu=file_menu) file_menu.add_command(label="Настройки", command=self.open_settings) file_menu.add_separator() file_menu.add_command(label="Выход", command=self.quit) # Меню "Справка" help_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="Справка", menu=help_menu) help_menu.add_command(label="Проверить обновления", command=self.check_for_updates) help_menu.add_separator() help_menu.add_command(label="О программе", command=self.open_about) # Проверка наличия обновлений def check_for_updates(self): def on_update_check(update_available, error): if error: messagebox.showerror( "Ошибка проверки обновлений", f"Не удалось проверить наличие обновлений:\n{error}" ) elif update_available: release_info = self.update_checker.get_release_notes() if release_info: # Форматируем сообщение message = ( f"Доступна новая версия {release_info['version']}!\n\n" f"Тип релиза: {'Пре-релиз' if release_info['type'] == 'prerelease' else 'Стабильный'}\n\n" "Изменения:\n" f"{release_info['description']}\n\n" "Хотите перейти на страницу загрузки?" ) response = messagebox.askyesno( "Доступно обновление", message, ) if response: webbrowser.open(release_info["link"]) else: messagebox.showerror( "Ошибка", "Не удалось получить информацию о новой версии" ) else: messagebox.showinfo( "Проверка обновлений", "У вас установлена последняя версия программы" ) self.update_checker.check_updates(callback=on_update_check) # Обработчик изменения настроек def on_settings_changed(self): self.settings = settings_load() # Открытие окна настроек def open_settings(self): settings_window = SettingsWindow(self, self.settings, self.on_settings_changed) settings_window.transient(self) settings_window.grab_set() # Проверка первого запуска программы def check_first_run(self): show_settings = False # Проверяем существование файла настроек if not os.path.exists(SETTINGS_FILE): show_settings = True else: # Проверяем содержимое файла настроек try: with open(SETTINGS_FILE, "r", encoding="utf-8") as f: settings = json.load(f) # Если порт не настроен, считаем это первым запуском if settings.get("port") is None: show_settings = True except Exception: # Если файл поврежден или не читается, тоже показываем настройки show_settings = True if show_settings: # Создаем папку Settings, если её нет os.makedirs("Settings", exist_ok=True) response = messagebox.askyesno( "Первый запуск", "Это первый запуск программы. Хотите настроить параметры подключения сейчас?" ) if response: self.open_settings() # Создание вкладок def create_tabs(self): self.notebook = ttk.Notebook(self) self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5) # Создаем вкладки interactive_frame = ttk.Frame(self.notebook) file_exec_frame = ttk.Frame(self.notebook) config_editor_frame = ttk.Frame(self.notebook) tftp_frame = ttk.Frame(self.notebook) self.notebook.add(interactive_frame, text="Интерактивный режим") self.notebook.add(file_exec_frame, text="Выполнение файла") self.notebook.add(config_editor_frame, text="Редактор конфигурации") self.notebook.add(tftp_frame, text="TFTP Сервер") self.create_interactive_tab(interactive_frame) self.create_file_exec_tab(file_exec_frame) self.create_config_editor_tab(config_editor_frame) self.create_tftp_tab(tftp_frame) # Создание вкладки "Интерактивный режим" def create_interactive_tab(self, frame): control_frame = ttk.Frame(frame) control_frame.pack(fill=X, pady=5) ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5) ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5) self.interactive_text = CustomText(frame, wrap="word", height=20) self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5) input_frame = ttk.Frame(frame) input_frame.pack(fill=X, pady=5) ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5) self.command_entry = CustomEntry(input_frame, width=50) self.command_entry.pack(side=LEFT, padx=5) ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5) # Подключение к устройству def connect_device(self): if self.connection: messagebox.showinfo("Информация", "Уже подключено.") return if not self.settings.get("port"): messagebox.showerror("Ошибка", "COM-порт не выбран!") return self.connection = create_connection(self.settings) if self.connection: self.append_interactive_text("[INFO] Подключение установлено.\n") else: self.append_interactive_text("[ERROR] Не удалось установить соединение.\n") # Отключение от устройства def disconnect_device(self): if self.connection: try: self.connection.close() except Exception: pass self.connection = None self.append_interactive_text("[INFO] Соединение закрыто.\n") else: messagebox.showinfo("Информация", "Соединение не установлено.") # Отправка команды def send_command(self): if not self.connection: messagebox.showerror("Ошибка", "Сначала установите соединение!") return cmd = self.command_entry.get().strip() if not cmd: return self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n") threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start() # Обработка команды def process_command(self, cmd): try: success, response = send_command_and_process_response( self.connection, cmd, self.settings.get("timeout", 10), max_attempts=3, log_callback=self.append_interactive_text, login=self.settings.get("login"), password=self.settings.get("password"), is_gui=True ) except SerialException as e: self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n") logging.error(f"Ошибка отправки команды: {e}", exc_info=True) except Exception as e: self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n") logging.error(f"Неизвестная ошибка: {e}", exc_info=True) # Добавление текста в текстовое поле def append_interactive_text(self, text): append_text_to_widget(self.interactive_text, text) # Создание вкладки "Выполнить команды из файла" def create_file_exec_tab(self, frame): file_frame = ttk.Frame(frame) file_frame.pack(fill=X, pady=5) ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5) self.file_exec_var = StringVar(value=self.settings.get("config_file") or "") CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5) ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5) ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5) self.file_exec_text = CustomText(frame, wrap="word", height=15) self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5) # Выбор файла конфигурации для выполнения команд def select_config_file_fileexec(self): select_config_file(self, self.file_exec_var) # Выполнение команд из файла def execute_file_commands(self): if not self.settings.get("port"): messagebox.showerror("Ошибка", "COM-порт не выбран!") return if not self.file_exec_var.get(): messagebox.showerror("Ошибка", "Файл конфигурации не выбран!") return if not self.connection: self.connection = create_connection(self.settings) if not self.connection: self.append_file_exec_text("[ERROR] Не удалось установить соединение.\n") return threading.Thread( target=execute_commands_from_file, args=( self.connection, self.file_exec_var.get(), self.settings.get("timeout", 10), self.settings.get("copy_mode", "line"), self.settings.get("block_size", 15), self.append_file_exec_text, self.settings.get("login"), self.settings.get("password"), True, ), daemon=True, ).start() def append_file_exec_text(self, text): append_text_to_widget(self.file_exec_text, text) # Создание вкладки "Редактор конфигурационного файла" def create_config_editor_tab(self, frame): top_frame = ttk.Frame(frame) top_frame.pack(fill=X, pady=5) ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5) self.editor_file_var = StringVar(value=self.settings.get("config_file") or "") CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5) ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5) self.config_editor_text = CustomText(frame, wrap="word") self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5) # Выбор файла конфигурации для редактирования def select_config_file_editor(self): select_config_file(self, self.editor_file_var, save_to_settings=True) # Загрузка файла конфигурации def load_config_file(self): filename = self.editor_file_var.get() if not filename or not os.path.exists(filename): messagebox.showerror("Ошибка", "Файл конфигурации не выбран или не существует.") return try: with open(filename, "r", encoding="utf-8") as f: content = f.read() self.config_editor_text.delete("1.0", END) self.config_editor_text.insert(END, content) messagebox.showinfo("Информация", "Файл загружен.") except Exception as e: logging.error(f"Ошибка загрузки файла: {e}", exc_info=True) messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}") # Сохранение файла конфигурации def save_config_file(self): filename = self.editor_file_var.get() if not filename: messagebox.showerror("Ошибка", "Файл конфигурации не выбран.") return try: content = self.config_editor_text.get("1.0", END) with open(filename, "w", encoding="utf-8") as f: f.write(content) messagebox.showinfo("Информация", "Файл сохранён.") except Exception as e: logging.error(f"Ошибка сохранения файла: {e}", exc_info=True) messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}") # Открытие окна "О программе" def open_about(self): about_window = AboutWindow(self) about_window.transient(self) about_window.grab_set() # Создание вкладки TFTP сервера def create_tftp_tab(self, frame): # Создаем фрейм для управления TFTP сервером tftp_frame = ttk.Frame(frame) tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) # Создаем и размещаем элементы управления controls_frame = ttk.LabelFrame(tftp_frame, text="Управление TFTP сервером") controls_frame.pack(fill=X, padx=5, pady=5) # IP адрес ip_frame = ttk.Frame(controls_frame) ip_frame.pack(fill=X, padx=5, pady=2) ttk.Label(ip_frame, text="IP адрес:").pack(side=LEFT, padx=5) self.tftp_ip_var = StringVar(value="0.0.0.0") self.tftp_ip_combo = ttk.Combobox(ip_frame, textvariable=self.tftp_ip_var, state="readonly") self.tftp_ip_combo.pack(side=LEFT, fill=X, expand=True, padx=5) ttk.Button(ip_frame, text="Обновить", command=self.update_network_adapters).pack(side=LEFT, padx=5) # Заполняем список адаптеров self.update_network_adapters() # Порт port_frame = ttk.Frame(controls_frame) port_frame.pack(fill=X, padx=5, pady=2) ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5) self.tftp_port_var = StringVar(value="69") self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var) self.tftp_port_entry.pack(fill=X, expand=True, padx=5) # Кнопки управления buttons_frame = ttk.Frame(controls_frame) buttons_frame.pack(fill=X, padx=5, pady=5) self.start_tftp_button = ttk.Button( buttons_frame, text="Запустить сервер", command=self.start_tftp_server ) self.start_tftp_button.pack(side=LEFT, padx=5) self.stop_tftp_button = ttk.Button( buttons_frame, text="Остановить сервер", command=self.stop_tftp_server, state="disabled" ) self.stop_tftp_button.pack(side=LEFT, padx=5) # Лог сервера log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера") log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) self.tftp_log_text = CustomText(log_frame, wrap=tk.WORD, height=10) self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5) # Добавляем скроллбар для лога scrollbar = ttk.Scrollbar(self.tftp_log_text, command=self.tftp_log_text.yview) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.tftp_log_text.config(yscrollcommand=scrollbar.set) # Статус передач transfers_frame = ttk.LabelFrame(tftp_frame, text="Активные передачи") transfers_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) # Создаем таблицу для отображения активных передач columns = ("client", "filename", "progress", "remaining", "time") self.transfers_tree = ttk.Treeview(transfers_frame, columns=columns, show="headings") # Настраиваем заголовки колонок self.transfers_tree.heading("client", text="Клиент") self.transfers_tree.heading("filename", text="Файл") self.transfers_tree.heading("progress", text="Прогресс") self.transfers_tree.heading("remaining", text="Осталось") self.transfers_tree.heading("time", text="Время") # Настраиваем ширину колонок self.transfers_tree.column("client", width=120) self.transfers_tree.column("filename", width=150) self.transfers_tree.column("progress", width=100) self.transfers_tree.column("remaining", width=100) self.transfers_tree.column("time", width=80) self.transfers_tree.pack(fill=BOTH, expand=True, padx=5, pady=5) # Инициализация TFTP сервера self.tftp_server = None self.tftp_server_thread = None # Запуск TFTP сервера def start_tftp_server(self): try: # Получаем выбранный IP-адрес ip = self.tftp_ip_var.get() if not ip: messagebox.showerror("Ошибка", "Выберите IP-адрес для TFTP сервера") return # Проверяем корректность порта try: port = int(self.tftp_port_var.get()) if port <= 0 or port > 65535: raise ValueError("Порт должен быть в диапазоне 1-65535") except ValueError as e: messagebox.showerror("Ошибка", f"Некорректный порт: {str(e)}") return # Создаем экземпляр TFTP сервера self.tftp_server = TFTPServer("Firmware") # Устанавливаем callback для логирования def log_callback(message): # Фильтруем дублирующиеся сообщения о запуске/остановке сервера if "[INFO] TFTP сервер запущен" in message and hasattr(self, '_server_started'): return if "[INFO] TFTP сервер остановлен" in message and hasattr(self, '_server_stopped'): return self.append_tftp_log(message) # Устанавливаем флаги для отслеживания состояния if "[INFO] TFTP сервер запущен" in message: self._server_started = True elif "[INFO] TFTP сервер остановлен" in message: self._server_stopped = True # Обновляем информацию о передачах self.update_transfers_info() self.tftp_server.set_log_callback(log_callback) # Запускаем сервер в отдельном потоке self.tftp_server_thread = threading.Thread( target=self.run_tftp_server, args=(ip, port), daemon=True ) self.tftp_server_thread.start() # Обновляем состояние кнопок и элементов управления self.start_tftp_button.config(state="disabled") self.stop_tftp_button.config(state="normal") self.tftp_ip_combo.config(state="disabled") self.tftp_port_entry.config(state="disabled") # Запускаем периодическое обновление информации о передачах self.update_transfers_periodically() except Exception as e: self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}") messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}") # Запуск TFTP сервера в отдельном потоке def run_tftp_server(self, ip, port): try: self.tftp_server.start_server(ip, port) except Exception as e: self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}") # Остановка TFTP сервера def stop_tftp_server(self): if self.tftp_server: try: # Отключаем кнопки на время остановки сервера self.start_tftp_button.config(state="disabled") self.stop_tftp_button.config(state="disabled") # Сбрасываем флаги состояния if hasattr(self, '_server_started'): delattr(self, '_server_started') if hasattr(self, '_server_stopped'): delattr(self, '_server_stopped') # Останавливаем сервер self.tftp_server.stop_server() # Ждем завершения потока сервера с таймаутом if self.tftp_server_thread: self.tftp_server_thread.join(timeout=5.0) if self.tftp_server_thread.is_alive(): self.append_tftp_log("[WARN] Превышено время ожидания остановки сервера") # Очищаем ссылки на сервер и поток self.tftp_server = None self.tftp_server_thread = None # Обновляем состояние кнопок self.start_tftp_button.config(state="normal") self.stop_tftp_button.config(state="disabled") self.tftp_ip_combo.config(state="normal") self.tftp_port_entry.config(state="normal") # Очищаем таблицу передач for item in self.transfers_tree.get_children(): self.transfers_tree.delete(item) except Exception as e: self.append_tftp_log(f"[ERROR] Ошибка остановки сервера: {str(e)}") messagebox.showerror("Ошибка", f"Не удалось остановить TFTP сервер: {str(e)}") # Восстанавливаем состояние кнопок в случае ошибки self.start_tftp_button.config(state="disabled") self.stop_tftp_button.config(state="normal") def append_tftp_log(self, text): append_text_to_widget(self.tftp_log_text, text) # Обновление информации об активных передачах def update_transfers_info(self): if not self.tftp_server: return # Очищаем текущие записи for item in self.transfers_tree.get_children(): self.transfers_tree.delete(item) # Добавляем информацию о текущих передачах for client_addr, transfer_info in self.tftp_server.active_transfers.items(): filename = transfer_info['filename'] bytes_sent = transfer_info['bytes_sent'] filesize = transfer_info['filesize'] start_time = transfer_info['start_time'] # Вычисляем прогресс progress = f"{bytes_sent}/{filesize} байт" remaining_bytes = filesize - bytes_sent elapsed_time = time.time() - start_time # Вычисляем скорость передачи (байт/сек) if elapsed_time > 0: transfer_speed = bytes_sent / elapsed_time # Вычисляем оставшееся время if transfer_speed > 0: remaining_time = remaining_bytes / transfer_speed remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)" else: remaining_str = f"{remaining_bytes} байт (неизвестно)" else: remaining_str = f"{remaining_bytes} байт (вычисляется...)" # Добавляем запись в таблицу self.transfers_tree.insert("", END, values=( f"{client_addr[0]}:{client_addr[1]}", filename, progress, remaining_str, f"{elapsed_time:.1f}с" )) # Периодическое обновление информации о передачах def update_transfers_periodically(self): if self.tftp_server and self.tftp_server.running: self.update_transfers_info() # Планируем следующее обновление через 1 секунду self.after(1000, self.update_transfers_periodically) # Обновление списка сетевых адаптеров def update_network_adapters(self): adapters = get_network_adapters() self.tftp_ip_combo["values"] = adapters if not self.tftp_ip_var.get() in adapters: self.tftp_ip_var.set(adapters[0]) # ========================== # Основной запуск приложения # ========================== def main(): setup_logging() settings = settings_load() app = SerialAppGUI(settings) app.mainloop() # ========================== # Основной запуск приложения # ========================== if __name__ == "__main__": try: main() except KeyboardInterrupt: logging.info("Программа прервана пользователем (KeyboardInterrupt).") sys.exit(0) except Exception as e: logging.critical(f"Неизвестная ошибка: {e}", exc_info=True) sys.exit(1)