diff --git a/ComConfigCopy.py b/ComConfigCopy.py deleted file mode 100644 index a679b2c..0000000 --- a/ComConfigCopy.py +++ /dev/null @@ -1,1274 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# ------------------------------------------------------------ -# Это программа для копирования конфигураций на коммутаторы -# ------------------------------------------------------------ - - -# import argparse Использовался для получения аргументов из командной строки (не используется) -# import platform Использовался для получения списка сетевых адаптеров (не используется) -# import subprocess Использовался для получения списка сетевых адаптеров (не используется) -# import socket не используется -import json -import logging -import os -import re -import sys -import threading -import time -import webbrowser -from getpass import getpass -from logging.handlers import RotatingFileHandler -import tkinter as tk -from tkinter import ( - StringVar, - END, - BOTH, - LEFT, - X, - W, - filedialog, - messagebox, - simpledialog, -) -from tkinter import ttk - -import serial -import serial.tools.list_ports -from serial.serialutil import SerialException -from about_window import AboutWindow -from TFTPServer import TFTPServer -# from TFTPServer import TFTPServerThread -import socket -from update_checker import UpdateChecker - -# Версия программы -VERSION = "1.0.0" - -# Создаем необходимые папки -os.makedirs("Logs", exist_ok=True) -os.makedirs("Configs", exist_ok=True) -os.makedirs("Settings", exist_ok=True) -os.makedirs("Firmware", exist_ok=True) -os.makedirs("docs", exist_ok=True) - -# Файл настроек находится в папке Settings -SETTINGS_FILE = os.path.join("Settings", "settings.json") - -# ========================== -# Функции работы с настройками и логированием -# ========================== - -def setup_logging(): - """Настройка логирования с использованием RotatingFileHandler.""" - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) - log_path = os.path.join("Logs", "app.log") - handler = RotatingFileHandler( - log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8" - ) - formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - handler.setFormatter(formatter) - logger.addHandler(handler) - -def settings_load(): - """Загрузка настроек из JSON-файла или создание настроек по умолчанию.""" - default_settings = { - "port": None, # Порт для подключения - "baudrate": 9600, # Скорость передачи данных - "config_file": None, # Файл конфигурации - "login": None, # Логин для подключения - "password": None, # Пароль для подключения - "timeout": 10, # Таймаут подключения - "copy_mode": "line", # Режим копирования - "block_size": 15, # Размер блока команд - "prompt": ">", # Используется для определения приглашения - } - - # Создаем папку Settings, если её нет - os.makedirs("Settings", exist_ok=True) - - if not os.path.exists(SETTINGS_FILE): - try: - # При первом запуске создаем новый файл настроек - with open(SETTINGS_FILE, "w", encoding="utf-8") as f: - json.dump(default_settings, f, indent=4, ensure_ascii=False) - logging.info("Файл настроек создан с настройками по умолчанию.") - return default_settings - except Exception as e: - logging.error(f"Ошибка при создании файла настроек: {e}", exc_info=True) - return default_settings - - try: - # Пытаемся загрузить существующие настройки - with open(SETTINGS_FILE, "r", encoding="utf-8") as f: - settings = json.load(f) - - # Проверяем наличие всех необходимых параметров - settings_changed = False - for key, value in default_settings.items(): - if key not in settings: - settings[key] = value - settings_changed = True - - # Если были добавлены новые параметры, сохраняем обновленные настройки - if settings_changed: - try: - with open(SETTINGS_FILE, "w", encoding="utf-8") as f: - json.dump(settings, f, indent=4, ensure_ascii=False) - logging.info("Файл настроек обновлен с новыми параметрами.") - except Exception as e: - logging.error(f"Ошибка при обновлении файла настроек: {e}", exc_info=True) - - return settings - except Exception as e: - logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True) - return default_settings - -def settings_save(settings): - """Сохранение настроек в JSON-файл.""" - try: - with open(SETTINGS_FILE, "w", encoding="utf-8") as f: - json.dump(settings, f, indent=4, ensure_ascii=False) - logging.info("Настройки сохранены в файл.") - except Exception as e: - logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True) - -def list_serial_ports(): - """Получение списка доступных последовательных портов.""" - ports = serial.tools.list_ports.comports() - logging.debug(f"Найдено {len(ports)} серийных портов.") - return [port.device for port in ports] - -# ========================== -# Функции работы с сетевыми адаптерами (не используются) -# ========================== - -def get_network_adapters(): - """Получение списка сетевых адаптеров и их IP-адресов.""" - adapters = [] - try: - # Получаем имя хоста - hostname = socket.gethostname() - # Получаем все адреса для данного хоста - addresses = socket.getaddrinfo(hostname, None) - - # Создаем множество для хранения уникальных IP-адресов - unique_ips = set() - - for addr in addresses: - ip = addr[4][0] - # Пропускаем IPv6 и локальные адреса - if ':' not in ip and not ip.startswith('127.'): - unique_ips.add(ip) - - # Добавляем все найденные IP-адреса в список - for ip in sorted(unique_ips): - adapters.append(f"{ip}") - - # Добавляем 0.0.0.0 для прослушивания всех интерфейсов - adapters.insert(0, "0.0.0.0") - - except Exception as e: - logging.error(f"Ошибка при получении списка сетевых адаптеров: {e}", exc_info=True) - # В случае ошибки возвращаем хотя бы 0.0.0.0 - adapters = ["0.0.0.0"] - - return adapters - -# ========================== -# Функции работы с COM-соединением -# ========================== - -def create_connection(settings): - """Создание соединения с устройством через последовательный порт.""" - try: - conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1) - logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.") - time.sleep(1) - return conn - except SerialException as e: - logging.error(f"Ошибка подключения: {e}", exc_info=True) - except Exception as e: - logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True) - return None - - - -# Проверка наличия логина и пароля в настройках и отправка их на устройство - -def send_login_password(serial_connection, login=None, password=None, is_gui=False): - """Отправка логина и пароля на устройство.""" - if not login: - if is_gui: - login = simpledialog.askstring("Login", "Введите логин:") - if login is None: - login = "" - else: - login = input("Введите логин: ") - - if not password: - if is_gui: - password = simpledialog.askstring("Password", "Введите пароль:", show="*") - if password is None: - password = "" - else: - password = getpass("Введите пароль: ") - - -# Чтение ответа от устройства с учётом таймаута. - -def read_response(serial_connection, timeout, login=None, password=None, is_gui=False): - """ - Чтение ответа от устройства с учётом таймаута. - Если в последней строке появляется запрос логина или пароля, данные отправляются автоматически. - """ - response = b"" - end_time = time.time() + timeout - decoded = "" - while time.time() < end_time: - if serial_connection.in_waiting: - chunk = serial_connection.read(serial_connection.in_waiting) - response += chunk - - if b"--More--" in response: - serial_connection.write(b"\n") - response = response.replace(b"--More--", b"") - - try: - decoded = response.decode(errors="ignore") - except Exception: - decoded = "" - lines = decoded.rstrip().splitlines() - if lines: - last_line = lines[-1].strip() - - if re.search(r'(login:|username:)$', last_line, re.IGNORECASE): - send_login_password(serial_connection, login, None, is_gui) - response = b"" - continue - - if re.search(r'(password:)$', last_line, re.IGNORECASE): - send_login_password(serial_connection, None, password, is_gui) - response = b"" - continue - - if last_line.endswith(">") or last_line.endswith("#"): - break - else: - time.sleep(0.1) - return decoded - -def generate_command_blocks(lines, block_size): - """Генерация блоков команд для блочного копирования.""" - blocks = [] - current_block = [] - for line in lines: - trimmed = line.strip() - if not trimmed: - continue - lower_line = trimmed.lower() - if lower_line.startswith("vlan") or lower_line.startswith("enable") or lower_line.startswith("interface"): - if current_block: - blocks.append("\n".join(current_block)) - current_block = [] - blocks.append(trimmed) - elif lower_line.startswith("exit"): - current_block.append(trimmed) - blocks.append("\n".join(current_block)) - current_block = [] - else: - current_block.append(trimmed) - if len(current_block) >= block_size: - blocks.append("\n".join(current_block)) - current_block = [] - if current_block: - blocks.append("\n".join(current_block)) - return blocks - -def execute_commands_from_file( - serial_connection, - filename, - timeout, - copy_mode, - block_size, - log_callback=None, - login=None, - password=None, - is_gui=False, -): - """ - Выполнение команд из файла конфигурации. - Если передан log_callback, вывод будет отображаться в GUI. - Теперь при обнаружении ошибки (например, если в ответе присутствует символ '^') - команда будет отправляться повторно. - """ - try: - with open(filename, "r", encoding="utf-8") as file: - lines = [line for line in file if line.strip()] - msg = f"Выполнение команд из файла: {filename}\n" - logging.info(msg) - if log_callback: - log_callback(msg) - # Если выбран построчный режим - if copy_mode == "line": - for cmd in lines: - cmd = cmd.strip() - max_attempts = 3 - attempt = 0 - while attempt < max_attempts: - msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n" - if log_callback: - log_callback(msg) - serial_connection.write((cmd + "\n").encode()) - logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") - response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) - if response: - if '^' in response: - msg = ( - f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" - f"Ответ устройства:\n{response}\n" - f"Повторная отправка команды...\n" - ) - if log_callback: - log_callback(msg) - logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.") - attempt += 1 - time.sleep(1) - continue - else: - msg = f"Ответ устройства:\n{response}\n" - if log_callback: - log_callback(msg) - logging.info(f"Ответ устройства:\n{response}") - break - else: - msg = f"Ответ не получен для команды: {cmd}\n" - if log_callback: - log_callback(msg) - logging.warning(f"Нет ответа для команды: {cmd}") - break - if attempt == max_attempts: - msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n" - if log_callback: - log_callback(msg) - logging.error(msg) - time.sleep(1) - # Если выбран блочный режим - elif copy_mode == "block": - blocks = generate_command_blocks(lines, block_size) - for block in blocks: - msg = f"\nОтправка блока команд:\n{block}\n" - if log_callback: - log_callback(msg) - serial_connection.write((block + "\n").encode()) - logging.info(f"Отправлен блок команд:\n{block}") - response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) - # Если обнаружена ошибка в ответе на блок, отправляем команды по очереди - if response and '^' in response: - msg = ( - f"[WARNING] Обнаружена ошибка при выполнении блока команд.\n" - f"Ответ устройства:\n{response}\n" - f"Пересылаются команды по отдельности...\n" - ) - if log_callback: - log_callback(msg) - logging.warning("Ошибка в блочном режиме – отправляем команды индивидуально.") - for line in block.splitlines(): - cmd = line.strip() - if not cmd: - continue - max_attempts = 3 - attempt = 0 - while attempt < max_attempts: - sub_msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n" - if log_callback: - log_callback(sub_msg) - serial_connection.write((cmd + "\n").encode()) - logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") - sub_response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui) - if sub_response: - if '^' in sub_response: - sub_msg = ( - f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" - f"Ответ устройства:\n{sub_response}\n" - f"Повторная отправка команды...\n" - ) - if log_callback: - log_callback(sub_msg) - logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.") - attempt += 1 - time.sleep(1) - continue - else: - sub_msg = f"Ответ устройства:\n{sub_response}\n" - if log_callback: - log_callback(sub_msg) - logging.info(f"Ответ устройства:\n{sub_response}") - break - else: - sub_msg = f"Ответ не получен для команды: {cmd}\n" - if log_callback: - log_callback(sub_msg) - logging.warning(f"Нет ответа для команды: {cmd}") - break - if attempt == max_attempts: - sub_msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n" - if log_callback: - log_callback(sub_msg) - logging.error(sub_msg) - time.sleep(1) - else: - if response: - msg = f"Ответ устройства:\n{response}\n" - if log_callback: - log_callback(msg) - logging.info(f"Ответ устройства:\n{response}") - else: - msg = f"Ответ не получен для блока:\n{block}\n" - if log_callback: - log_callback(msg) - logging.warning(f"Нет ответа для блока:\n{block}") - time.sleep(1) - except SerialException as e: - msg = f"Ошибка при выполнении команды: {e}\n" - if log_callback: - log_callback(msg) - logging.error(f"Ошибка при выполнении команды: {e}", exc_info=True) - except Exception as e: - msg = f"Ошибка при выполнении команд: {e}\n" - if log_callback: - log_callback(msg) - logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True) - -# ========================== -# Улучшенные текстовые виджеты -# ========================== - -class CustomText(tk.Text): - """Улучшенный текстовый виджет с расширенной функциональностью копирования/вставки""" - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.create_context_menu() - self.bind_shortcuts() - - def create_context_menu(self): - self.context_menu = tk.Menu(self, tearoff=0) - self.context_menu.add_command(label="Вырезать", command=self.cut) - self.context_menu.add_command(label="Копировать", command=self.copy) - self.context_menu.add_command(label="Вставить", command=self.paste) - self.context_menu.add_separator() - self.context_menu.add_command(label="Выделить всё", command=self.select_all) - - self.bind("", self.show_context_menu) - - def show_context_menu(self, event): - self.context_menu.post(event.x_root, event.y_root) - - def bind_shortcuts(self): - # Стандартные сочетания - self.bind("", lambda e: self.event_generate("<>")) - self.bind("", lambda e: self.event_generate("<>")) - self.bind("", lambda e: self.event_generate("<>")) - self.bind("", self.select_all) - - # Shift+Insert для вставки - self.bind("", lambda e: self.event_generate("<>")) - - # Ctrl+Insert для копирования - self.bind("", lambda e: self.event_generate("<>")) - - # Shift+Delete для вырезания - self.bind("", lambda e: self.event_generate("<>")) - - def cut(self): - self.event_generate("<>") - - def copy(self): - self.event_generate("<>") - - def paste(self): - self.event_generate("<>") - - def select_all(self, event=None): - self.tag_add(tk.SEL, "1.0", tk.END) - self.mark_set(tk.INSERT, "1.0") - self.see(tk.INSERT) - return "break" - -class CustomEntry(ttk.Entry): - """Улучшенное поле ввода с расширенной функциональностью копирования/вставки""" - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.create_context_menu() - self.bind_shortcuts() - - def create_context_menu(self): - self.context_menu = tk.Menu(self, tearoff=0) - self.context_menu.add_command(label="Вырезать", command=self.cut) - self.context_menu.add_command(label="Копировать", command=self.copy) - self.context_menu.add_command(label="Вставить", command=self.paste) - self.context_menu.add_separator() - self.context_menu.add_command(label="Выделить всё", command=self.select_all) - - self.bind("", self.show_context_menu) - - def show_context_menu(self, event): - self.context_menu.post(event.x_root, event.y_root) - - def bind_shortcuts(self): - # Стандартные сочетания - self.bind("", lambda e: self.event_generate("<>")) - self.bind("", lambda e: self.event_generate("<>")) - self.bind("", lambda e: self.event_generate("<>")) - self.bind("", self.select_all) - - # Shift+Insert для вставки - self.bind("", lambda e: self.event_generate("<>")) - - # Ctrl+Insert для копирования - self.bind("", lambda e: self.event_generate("<>")) - - # Shift+Delete для вырезания - self.bind("", lambda e: self.event_generate("<>")) - - def cut(self): - self.event_generate("<>") - - def copy(self): - self.event_generate("<>") - - def paste(self): - self.event_generate("<>") - - def select_all(self, event=None): - self.select_range(0, tk.END) - return "break" - -class SettingsWindow(tk.Toplevel): - def __init__(self, parent, settings, callback=None): - super().__init__(parent) - self.title("Настройки") - self.geometry("600x400") - self.settings = settings - self.callback = callback - self.resizable(False, False) - - # Создаем фрейм для настроек - settings_frame = ttk.Frame(self, padding="10") - settings_frame.pack(fill=BOTH, expand=True) - - # COM порт - ttk.Label(settings_frame, text="COM порт:").grid(row=0, column=0, sticky=W, pady=5) - self.port_var = StringVar(value=settings.get("port", "")) - self.port_combo = ttk.Combobox(settings_frame, textvariable=self.port_var) - self.port_combo.grid(row=0, column=1, sticky=W, pady=5) - ttk.Button(settings_frame, text="Обновить", command=self.update_ports).grid(row=0, column=2, padx=5) - - # Скорость передачи - ttk.Label(settings_frame, text="Скорость:").grid(row=1, column=0, sticky=W, pady=5) - self.baudrate_var = StringVar(value=str(settings.get("baudrate", 9600))) - baudrate_combo = ttk.Combobox(settings_frame, textvariable=self.baudrate_var, - values=["9600", "19200", "38400", "57600", "115200"]) - baudrate_combo.grid(row=1, column=1, sticky=W, pady=5) - - # Таймаут - ttk.Label(settings_frame, text="Таймаут (сек):").grid(row=2, column=0, sticky=W, pady=5) - self.timeout_var = StringVar(value=str(settings.get("timeout", 10))) - timeout_entry = ttk.Entry(settings_frame, textvariable=self.timeout_var) - timeout_entry.grid(row=2, column=1, sticky=W, pady=5) - - # Режим копирования - ttk.Label(settings_frame, text="Режим копирования:").grid(row=3, column=0, sticky=W, pady=5) - self.copy_mode_var = StringVar(value=settings.get("copy_mode", "line")) - copy_mode_frame = ttk.Frame(settings_frame) - copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5) - ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line", - variable=self.copy_mode_var).pack(side=LEFT) - ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block", - variable=self.copy_mode_var).pack(side=LEFT) - - # Размер блока - ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5) - self.block_size_var = StringVar(value=str(settings.get("block_size", 15))) - block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var) - block_size_entry.grid(row=4, column=1, sticky=W, pady=5) - - # Приглашение командной строки - ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5) - self.prompt_var = StringVar(value=settings.get("prompt", ">")) - prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var) - prompt_entry.grid(row=5, column=1, sticky=W, pady=5) - - # Кнопки - button_frame = ttk.Frame(settings_frame) - button_frame.grid(row=6, column=0, columnspan=3, pady=20) - ttk.Button(button_frame, text="Сохранить", command=self.save_settings).pack(side=LEFT, padx=5) - ttk.Button(button_frame, text="Отмена", command=self.destroy).pack(side=LEFT, padx=5) - - self.update_ports() - - # Центрируем окно - self.center_window() - - def center_window(self): - self.update_idletasks() - width = self.winfo_width() - height = self.winfo_height() - x = (self.winfo_screenwidth() // 2) - (width // 2) - y = (self.winfo_screenheight() // 2) - (height // 2) - self.geometry(f"{width}x{height}+{x}+{y}") - - def update_ports(self): - ports = list_serial_ports() - self.port_combo["values"] = ports - if ports and not self.port_var.get(): - self.port_var.set(ports[0]) - - def save_settings(self): - try: - self.settings.update({ - "port": self.port_var.get(), - "baudrate": int(self.baudrate_var.get()), - "timeout": int(self.timeout_var.get()), - "copy_mode": self.copy_mode_var.get(), - "block_size": int(self.block_size_var.get()), - "prompt": self.prompt_var.get() - }) - settings_save(self.settings) - if self.callback: - self.callback() - self.destroy() - messagebox.showinfo("Успех", "Настройки успешно сохранены") - except ValueError as e: - messagebox.showerror("Ошибка", "Проверьте правильность введенных значений") - -class SerialAppGUI(tk.Tk): - def __init__(self, settings): - super().__init__() - self.title("ComConfigCopy") - self.geometry("900x700") - - # Добавляем VERSION как атрибут класса - self.VERSION = VERSION - - # Инициализация проверки обновлений - self.update_checker = UpdateChecker( - VERSION, - "https://gitea.filow.ru/LowaSC/ComConfigCopy" - ) - - # Настройка стиля - self.style = ttk.Style(self) - self.style.theme_use("clam") - default_font = ("Segoe UI", 10) - self.option_add("*Font", default_font) - - self.settings = settings - self.connection = None - self.tftp_server = None - - self.create_menu() - self.create_tabs() - - # Проверка первого запуска - self.check_first_run() - - def create_menu(self): - menubar = tk.Menu(self) - self.config(menu=menubar) - - # Меню "Файл" - file_menu = tk.Menu(menubar, tearoff=0) - menubar.add_cascade(label="Файл", menu=file_menu) - file_menu.add_command(label="Настройки", command=self.open_settings) - file_menu.add_separator() - file_menu.add_command(label="Выход", command=self.quit) - - # Меню "Справка" - help_menu = tk.Menu(menubar, tearoff=0) - menubar.add_cascade(label="Справка", menu=help_menu) - help_menu.add_command(label="Проверить обновления", command=self.check_for_updates) - help_menu.add_separator() - help_menu.add_command(label="О программе", command=self.open_about) - - def check_for_updates(self): - """Проверка наличия обновлений""" - def on_update_check(update_available, error): - if error: - messagebox.showerror( - "Ошибка проверки обновлений", - f"Не удалось проверить наличие обновлений:\n{error}" - ) - elif update_available: - release_info = self.update_checker.get_release_notes() - if release_info: - response = messagebox.askyesno( - "Доступно обновление", - f"Доступна новая версия {release_info['version']}!\n\n" - f"Изменения:\n{release_info['description']}\n\n" - "Хотите перейти на страницу загрузки?", - ) - if response: - webbrowser.open(release_info["download_url"]) - else: - messagebox.showerror( - "Ошибка", - "Не удалось получить информацию о новой версии" - ) - else: - messagebox.showinfo( - "Проверка обновлений", - "У вас установлена последняя версия программы" - ) - - self.update_checker.check_updates(callback=on_update_check) - - def on_settings_changed(self): - """Обработчик изменения настроек""" - self.settings = settings_load() - - def open_settings(self): - """Открытие окна настроек""" - settings_window = SettingsWindow(self, self.settings, self.on_settings_changed) - settings_window.transient(self) - settings_window.grab_set() - - def check_first_run(self): - """Проверка первого запуска программы""" - show_settings = False - - # Проверяем существование файла настроек - if not os.path.exists(SETTINGS_FILE): - show_settings = True - else: - # Проверяем содержимое файла настроек - try: - with open(SETTINGS_FILE, "r", encoding="utf-8") as f: - settings = json.load(f) - # Если порт не настроен, считаем это первым запуском - if settings.get("port") is None: - show_settings = True - except Exception: - # Если файл поврежден или не читается, тоже показываем настройки - show_settings = True - - if show_settings: - # Создаем папку Settings, если её нет - os.makedirs("Settings", exist_ok=True) - - response = messagebox.askyesno( - "Первый запуск", - "Это первый запуск программы. Хотите настроить параметры подключения сейчас?" - ) - if response: - self.open_settings() - - def create_tabs(self): - self.notebook = ttk.Notebook(self) - self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5) - - # Создаем вкладки - interactive_frame = ttk.Frame(self.notebook) - file_exec_frame = ttk.Frame(self.notebook) - config_editor_frame = ttk.Frame(self.notebook) - tftp_frame = ttk.Frame(self.notebook) - - self.notebook.add(interactive_frame, text="Интерактивный режим") - self.notebook.add(file_exec_frame, text="Выполнение файла") - self.notebook.add(config_editor_frame, text="Редактор конфигурации") - self.notebook.add(tftp_frame, text="TFTP Сервер") - - self.create_interactive_tab(interactive_frame) - self.create_file_exec_tab(file_exec_frame) - self.create_config_editor_tab(config_editor_frame) - self.create_tftp_tab(tftp_frame) - - # -------------- Вкладка "Интерактивный режим" -------------- - def create_interactive_tab(self, frame): - control_frame = ttk.Frame(frame) - control_frame.pack(fill=X, pady=5) - ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5) - ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5) - - self.interactive_text = CustomText(frame, wrap="word", height=20) - self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5) - - input_frame = ttk.Frame(frame) - input_frame.pack(fill=X, pady=5) - ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5) - self.command_entry = CustomEntry(input_frame, width=50) - self.command_entry.pack(side=LEFT, padx=5) - ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5) - - def connect_device(self): - if self.connection: - messagebox.showinfo("Информация", "Уже подключено.") - return - if not self.settings.get("port"): - messagebox.showerror("Ошибка", "COM-порт не выбран!") - return - self.connection = create_connection(self.settings) - if self.connection: - self.append_interactive_text("[INFO] Подключение установлено.\n") - else: - self.append_interactive_text("[ERROR] Не удалось установить соединение.\n") - - def disconnect_device(self): - if self.connection: - try: - self.connection.close() - except Exception: - pass - self.connection = None - self.append_interactive_text("[INFO] Соединение закрыто.\n") - else: - messagebox.showinfo("Информация", "Соединение не установлено.") - - def send_command(self): - if not self.connection: - messagebox.showerror("Ошибка", "Сначала установите соединение!") - return - cmd = self.command_entry.get().strip() - if not cmd: - return - self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n") - threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start() - - def process_command(self, cmd): - try: - max_attempts = 3 - attempt = 0 - while attempt < max_attempts: - self.connection.write((cmd + "\n").encode()) - logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})") - response = read_response( - self.connection, - self.settings.get("timeout", 10), - login=self.settings.get("login"), - password=self.settings.get("password"), - is_gui=True, - ) - if response: - if '^' in response: - self.append_interactive_text( - f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n" - f"Ответ устройства:\n{response}\n" - f"Повторная отправка команды...\n" - ) - logging.warning(f"Ошибка в команде: {cmd}. Попытка повторной отправки.") - attempt += 1 - time.sleep(1) - continue - else: - self.append_interactive_text(f"[INFO] Ответ устройства:\n{response}\n") - logging.info(f"Получен ответ:\n{response}") - break - else: - self.append_interactive_text("[WARN] Ответ не получен.\n") - logging.warning("Нет ответа от устройства в течение таймаута.") - break - if attempt == max_attempts: - self.append_interactive_text(f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n") - logging.error(f"Команда не выполнена корректно после {max_attempts} попыток: {cmd}") - except SerialException as e: - self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n") - logging.error(f"Ошибка отправки команды: {e}", exc_info=True) - except Exception as e: - self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n") - logging.error(f"Неизвестная ошибка: {e}", exc_info=True) - - def append_interactive_text(self, text): - self.interactive_text.insert(END, text) - self.interactive_text.see(END) - - # -------------- Вкладка "Выполнить команды из файла" -------------- - def create_file_exec_tab(self, frame): - file_frame = ttk.Frame(frame) - file_frame.pack(fill=X, pady=5) - ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5) - self.file_exec_var = StringVar(value=self.settings.get("config_file") or "") - CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5) - ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5) - ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5) - self.file_exec_text = CustomText(frame, wrap="word", height=15) - self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5) - - def select_config_file_fileexec(self): - filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")]) - if filename: - self.file_exec_var.set(filename) - - def execute_file_commands(self): - if not self.settings.get("port"): - messagebox.showerror("Ошибка", "COM-порт не выбран!") - return - if not self.file_exec_var.get(): - messagebox.showerror("Ошибка", "Файл конфигурации не выбран!") - return - if not self.connection: - self.connection = create_connection(self.settings) - if not self.connection: - self.append_file_exec_text("[ERROR] Не удалось установить соединение.\n") - return - threading.Thread( - target=execute_commands_from_file, - args=( - self.connection, - self.file_exec_var.get(), - self.settings.get("timeout", 10), - self.settings.get("copy_mode", "line"), - self.settings.get("block_size", 15), - self.append_file_exec_text, - self.settings.get("login"), - self.settings.get("password"), - True, - ), - daemon=True, - ).start() - - def append_file_exec_text(self, text): - self.file_exec_text.insert(END, text) - self.file_exec_text.see(END) - - # -------------- Вкладка "Редактор конфигурационного файла" -------------- - def create_config_editor_tab(self, frame): - top_frame = ttk.Frame(frame) - top_frame.pack(fill=X, pady=5) - ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5) - self.editor_file_var = StringVar(value=self.settings.get("config_file") or "") - CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5) - ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5) - ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5) - ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5) - self.config_editor_text = CustomText(frame, wrap="word") - self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5) - - def select_config_file_editor(self): - filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")]) - if filename: - self.editor_file_var.set(filename) - self.settings["config_file"] = filename - settings_save(self.settings) - - def load_config_file(self): - filename = self.editor_file_var.get() - if not filename or not os.path.exists(filename): - messagebox.showerror("Ошибка", "Файл конфигурации не выбран или не существует.") - return - try: - with open(filename, "r", encoding="utf-8") as f: - content = f.read() - self.config_editor_text.delete("1.0", END) - self.config_editor_text.insert(END, content) - messagebox.showinfo("Информация", "Файл загружен.") - except Exception as e: - logging.error(f"Ошибка загрузки файла: {e}", exc_info=True) - messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}") - - def save_config_file(self): - filename = self.editor_file_var.get() - if not filename: - messagebox.showerror("Ошибка", "Файл конфигурации не выбран.") - return - try: - content = self.config_editor_text.get("1.0", END) - with open(filename, "w", encoding="utf-8") as f: - f.write(content) - messagebox.showinfo("Информация", "Файл сохранён.") - except Exception as e: - logging.error(f"Ошибка сохранения файла: {e}", exc_info=True) - messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}") - - def open_about(self): - about_window = AboutWindow(self) - about_window.transient(self) - about_window.grab_set() - - def create_tftp_tab(self, frame): - """Создание вкладки TFTP сервера.""" - # Создаем фрейм для управления TFTP сервером - tftp_frame = ttk.Frame(frame) - tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) - - # Создаем и размещаем элементы управления - controls_frame = ttk.LabelFrame(tftp_frame, text="Управление TFTP сервером") - controls_frame.pack(fill=X, padx=5, pady=5) - - # IP адрес - ip_frame = ttk.Frame(controls_frame) - ip_frame.pack(fill=X, padx=5, pady=2) - ttk.Label(ip_frame, text="IP адрес:").pack(side=LEFT, padx=5) - self.tftp_ip_var = StringVar(value="0.0.0.0") - self.tftp_ip_combo = ttk.Combobox(ip_frame, textvariable=self.tftp_ip_var, state="readonly") - self.tftp_ip_combo.pack(side=LEFT, fill=X, expand=True, padx=5) - ttk.Button(ip_frame, text="Обновить", command=self.update_network_adapters).pack(side=LEFT, padx=5) - - # Заполняем список адаптеров - self.update_network_adapters() - - # Порт - port_frame = ttk.Frame(controls_frame) - port_frame.pack(fill=X, padx=5, pady=2) - ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5) - self.tftp_port_var = StringVar(value="69") - self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var) - self.tftp_port_entry.pack(fill=X, expand=True, padx=5) - - # Кнопки управления - buttons_frame = ttk.Frame(controls_frame) - buttons_frame.pack(fill=X, padx=5, pady=5) - - self.start_tftp_button = ttk.Button( - buttons_frame, - text="Запустить сервер", - command=self.start_tftp_server - ) - self.start_tftp_button.pack(side=LEFT, padx=5) - - self.stop_tftp_button = ttk.Button( - buttons_frame, - text="Остановить сервер", - command=self.stop_tftp_server, - state="disabled" - ) - self.stop_tftp_button.pack(side=LEFT, padx=5) - - # Лог сервера - log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера") - log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) - - self.tftp_log_text = CustomText(log_frame, wrap=tk.WORD, height=10) - self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5) - - # Добавляем скроллбар для лога - scrollbar = ttk.Scrollbar(self.tftp_log_text, command=self.tftp_log_text.yview) - scrollbar.pack(side=tk.RIGHT, fill=tk.Y) - self.tftp_log_text.config(yscrollcommand=scrollbar.set) - - # Статус передач - transfers_frame = ttk.LabelFrame(tftp_frame, text="Активные передачи") - transfers_frame.pack(fill=BOTH, expand=True, padx=5, pady=5) - - # Создаем таблицу для отображения активных передач - columns = ("client", "filename", "progress", "remaining", "time") - self.transfers_tree = ttk.Treeview(transfers_frame, columns=columns, show="headings") - - # Настраиваем заголовки колонок - self.transfers_tree.heading("client", text="Клиент") - self.transfers_tree.heading("filename", text="Файл") - self.transfers_tree.heading("progress", text="Прогресс") - self.transfers_tree.heading("remaining", text="Осталось") - self.transfers_tree.heading("time", text="Время") - - # Настраиваем ширину колонок - self.transfers_tree.column("client", width=120) - self.transfers_tree.column("filename", width=150) - self.transfers_tree.column("progress", width=100) - self.transfers_tree.column("remaining", width=100) - self.transfers_tree.column("time", width=80) - - self.transfers_tree.pack(fill=BOTH, expand=True, padx=5, pady=5) - - # Инициализация TFTP сервера - self.tftp_server = None - self.tftp_server_thread = None - - def start_tftp_server(self): - """Запуск TFTP сервера.""" - try: - # Получаем выбранный IP-адрес - ip = self.tftp_ip_var.get() - if not ip: - messagebox.showerror("Ошибка", "Выберите IP-адрес для TFTP сервера") - return - - # Проверяем корректность порта - try: - port = int(self.tftp_port_var.get()) - if port <= 0 or port > 65535: - raise ValueError("Порт должен быть в диапазоне 1-65535") - except ValueError as e: - messagebox.showerror("Ошибка", f"Некорректный порт: {str(e)}") - return - - # Создаем экземпляр TFTP сервера - self.tftp_server = TFTPServer("Firmware") - - # Устанавливаем callback для логирования - def log_callback(message): - # Фильтруем дублирующиеся сообщения о запуске/остановке сервера - if "[INFO] TFTP сервер запущен" in message and hasattr(self, '_server_started'): - return - if "[INFO] TFTP сервер остановлен" in message and hasattr(self, '_server_stopped'): - return - - self.append_tftp_log(message) - - # Устанавливаем флаги для отслеживания состояния - if "[INFO] TFTP сервер запущен" in message: - self._server_started = True - elif "[INFO] TFTP сервер остановлен" in message: - self._server_stopped = True - - # Обновляем информацию о передачах - self.update_transfers_info() - - self.tftp_server.set_log_callback(log_callback) - - # Запускаем сервер в отдельном потоке - self.tftp_server_thread = threading.Thread( - target=self.run_tftp_server, - args=(ip, port), - daemon=True - ) - self.tftp_server_thread.start() - - # Обновляем состояние кнопок и элементов управления - self.start_tftp_button.config(state="disabled") - self.stop_tftp_button.config(state="normal") - self.tftp_ip_combo.config(state="disabled") - self.tftp_port_entry.config(state="disabled") - - # Запускаем периодическое обновление информации о передачах - self.update_transfers_periodically() - - except Exception as e: - self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}") - messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}") - - def run_tftp_server(self, ip, port): - """Запуск TFTP сервера в отдельном потоке.""" - try: - self.tftp_server.start_server(ip, port) - except Exception as e: - self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}") - - def stop_tftp_server(self): - """Остановка TFTP сервера.""" - if self.tftp_server: - try: - # Отключаем кнопки на время остановки сервера - self.start_tftp_button.config(state="disabled") - self.stop_tftp_button.config(state="disabled") - - # Сбрасываем флаги состояния - if hasattr(self, '_server_started'): - delattr(self, '_server_started') - if hasattr(self, '_server_stopped'): - delattr(self, '_server_stopped') - - # Останавливаем сервер - self.tftp_server.stop_server() - - # Ждем завершения потока сервера с таймаутом - if self.tftp_server_thread: - self.tftp_server_thread.join(timeout=5.0) - if self.tftp_server_thread.is_alive(): - self.append_tftp_log("[WARN] Превышено время ожидания остановки сервера") - - # Очищаем ссылки на сервер и поток - self.tftp_server = None - self.tftp_server_thread = None - - # Обновляем состояние кнопок - self.start_tftp_button.config(state="normal") - self.stop_tftp_button.config(state="disabled") - self.tftp_ip_combo.config(state="normal") - self.tftp_port_entry.config(state="normal") - - # Очищаем таблицу передач - for item in self.transfers_tree.get_children(): - self.transfers_tree.delete(item) - - except Exception as e: - self.append_tftp_log(f"[ERROR] Ошибка остановки сервера: {str(e)}") - messagebox.showerror("Ошибка", f"Не удалось остановить TFTP сервер: {str(e)}") - - # Восстанавливаем состояние кнопок в случае ошибки - self.start_tftp_button.config(state="disabled") - self.stop_tftp_button.config(state="normal") - - def append_tftp_log(self, message): - """Добавление сообщения в лог TFTP сервера.""" - self.tftp_log_text.insert(END, message + "\n") - self.tftp_log_text.see(END) - - def update_transfers_info(self): - """Обновление информации об активных передачах.""" - if not self.tftp_server: - return - - # Очищаем текущие записи - for item in self.transfers_tree.get_children(): - self.transfers_tree.delete(item) - - # Добавляем информацию о текущих передачах - for client_addr, transfer_info in self.tftp_server.active_transfers.items(): - filename = transfer_info['filename'] - bytes_sent = transfer_info['bytes_sent'] - filesize = transfer_info['filesize'] - start_time = transfer_info['start_time'] - - # Вычисляем прогресс - progress = f"{bytes_sent}/{filesize} байт" - remaining_bytes = filesize - bytes_sent - elapsed_time = time.time() - start_time - - # Вычисляем скорость передачи (байт/сек) - if elapsed_time > 0: - transfer_speed = bytes_sent / elapsed_time - # Вычисляем оставшееся время - if transfer_speed > 0: - remaining_time = remaining_bytes / transfer_speed - remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)" - else: - remaining_str = f"{remaining_bytes} байт (неизвестно)" - else: - remaining_str = f"{remaining_bytes} байт (вычисляется...)" - - # Добавляем запись в таблицу - self.transfers_tree.insert("", END, values=( - f"{client_addr[0]}:{client_addr[1]}", - filename, - progress, - remaining_str, - f"{elapsed_time:.1f}с" - )) - - def update_transfers_periodically(self): - """Периодическое обновление информации о передачах.""" - if self.tftp_server and self.tftp_server.running: - self.update_transfers_info() - # Планируем следующее обновление через 1 секунду - self.after(1000, self.update_transfers_periodically) - - def update_network_adapters(self): - """Обновление списка сетевых адаптеров.""" - adapters = get_network_adapters() - self.tftp_ip_combo["values"] = adapters - if not self.tftp_ip_var.get() in adapters: - self.tftp_ip_var.set(adapters[0]) - -# ========================== -# Основной запуск приложения -# ========================== -def main(): - setup_logging() - settings = settings_load() - app = SerialAppGUI(settings) - app.mainloop() - -# ========================== -# Основной запуск приложения -# ========================== -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - logging.info("Программа прервана пользователем (KeyboardInterrupt).") - sys.exit(0) - except Exception as e: - logging.critical(f"Неизвестная ошибка: {e}", exc_info=True) - sys.exit(1) diff --git a/TFTPServer.py b/TFTPServer.py deleted file mode 100644 index 6d74095..0000000 --- a/TFTPServer.py +++ /dev/null @@ -1,349 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -TFTP сервер для передачи прошивки с компьютера на коммутатор. - -- Создает сервер по заданному IP и порту. -- Расшаривает папку Firmware. -- Показывает текущее состояние сервера и статус передачи файла: - - кому (IP устройства), - - сколько осталось байт, - - сколько передано байт, - - время передачи. -""" - -import os -import socket -import struct -import threading -import time - -class TFTPServer: - def __init__(self, share_folder): - """ - Инициализация TFTP сервера. - - :param share_folder: Путь к папке, содержащей файлы (например, папка 'Firmware') - """ - self.share_folder = share_folder - self.log_callback = None - self.running = False - self.server_socket = None - self.lock = threading.Lock() - self.transfer_sockets = set() # Множество для хранения всех активных сокетов передачи - # Словарь активных передач для мониторинга их статуса. - # Ключ – адрес клиента, значение – словарь с информацией о передаче. - self.active_transfers = {} - - def set_log_callback(self, callback): - """ - Установка функции обратного вызова для логирования сообщений. - - :param callback: Функция, принимающая строку сообщения. - """ - self.log_callback = callback - - def log(self, message): - """ - Функция логирования: вызывает callback (если задан) или выводит сообщение в консоль. - - :param message: Строка с сообщением для логирования. - """ - if self.log_callback: - self.log_callback(message) - else: - print(message) - - def start_server(self, ip, port): - """ - Запуск TFTP сервера на указанном IP и порту. - - :param ip: IP-адрес для привязки сервера. - :param port: Порт для TFTP сервера. - """ - if self.running: - self.log("[WARN] Сервер уже запущен") - return - - self.running = True - try: - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.server_socket.bind((ip, port)) - self.log(f"[INFO] TFTP сервер запущен на {ip}:{port}") - - while self.running: - try: - self.server_socket.settimeout(1.0) - data, client_addr = self.server_socket.recvfrom(2048) - if data and self.running: - threading.Thread(target=self.handle_request, args=(data, client_addr), daemon=True).start() - except socket.timeout: - continue - except socket.error as e: - if self.running: # Логируем ошибку только если сервер еще запущен - self.log(f"[ERROR] Ошибка получения данных: {str(e)}") - break - except Exception as e: - if self.running: # Логируем ошибку только если сервер еще запущен - self.log(f"[ERROR] Ошибка получения данных: {str(e)}") - break - except Exception as e: - self.log(f"[ERROR] Ошибка запуска сервера: {str(e)}") - finally: - self.running = False - if self.server_socket: - try: - self.server_socket.close() - except: - pass - self.server_socket = None - - def stop_server(self): - """ - Остановка TFTP сервера. - """ - if not self.running: - return - - self.log("[INFO] Остановка TFTP сервера...") - self.running = False - - try: - # Закрываем основной сокет сервера первым - if self.server_socket: - try: - # Создаем временный сокет и отправляем пакет самому себе, - # чтобы разблокировать recvfrom - temp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - server_address = self.server_socket.getsockname() - temp_socket.sendto(b'', server_address) - except: - pass - finally: - try: - temp_socket.close() - except: - pass - - try: - self.server_socket.close() - except Exception as e: - self.log(f"[WARN] Ошибка при закрытии основного сокета: {str(e)}") - finally: - self.server_socket = None - - # Закрываем все активные сокеты передачи - with self.lock: - active_sockets = list(self.transfer_sockets) - self.transfer_sockets.clear() - active_transfers = dict(self.active_transfers) - self.active_transfers.clear() - - # Закрываем сокеты передачи после очистки множества - for sock in active_sockets: - try: - if sock: - sock.close() - except Exception as e: - self.log(f"[WARN] Ошибка при закрытии сокета передачи: {str(e)}") - - # Отправляем сообщения об остановке для активных передач - for client_addr, transfer_info in active_transfers.items(): - try: - self.send_error(client_addr, 0, "Сервер остановлен") - except: - pass - - except Exception as e: - self.log(f"[ERROR] Ошибка при остановке сервера: {str(e)}") - finally: - self.running = False # Гарантируем, что флаг running будет False - self.log("[INFO] TFTP сервер остановлен") - - def handle_request(self, data, client_addr): - """ - Обработка входящего запроса от клиента. - - :param data: Полученные данные (UDP-пакет). - :param client_addr: Адрес клиента, отправившего пакет. - """ - if len(data) < 2: - self.log(f"[WARN] Получен некорректный пакет от {client_addr}") - return - opcode = struct.unpack("!H", data[:2])[0] - if opcode == 1: # RRQ (Read Request) – запрос на чтение файла - self.handle_rrq(data, client_addr) - else: - self.log(f"[WARN] Неподдерживаемый запрос (опкод {opcode}) от {client_addr}") - - def handle_rrq(self, data, client_addr): - """ - Обработка запроса на чтение файла (RRQ). - - :param data: Данные запроса. - :param client_addr: Адрес клиента. - """ - try: - # RRQ формата: 2 байта опкода, затем строка имени файла, за которой следует 0, - # затем строка режима (например, "octet"), и завершается 0. - parts = data[2:].split(b'\0') - if len(parts) < 2: - self.log(f"[WARN] Некорректный RRQ пакет от {client_addr}") - return - filename = parts[0].decode('utf-8') - mode = parts[1].decode('utf-8').lower() - self.log(f"[INFO] Получен RRQ от {client_addr}: файл '{filename}', режим '{mode}'") - if mode != "octet": - self.send_error(client_addr, 0, "Поддерживается только octet режим") - return - file_path = os.path.join(self.share_folder, filename) - if not os.path.isfile(file_path): - self.send_error(client_addr, 1, "Файл не найден") - return - # Запускаем передачу файла в новом потоке. - threading.Thread(target=self.send_file, args=(file_path, client_addr), daemon=True).start() - except Exception as e: - self.log(f"[ERROR] Ошибка обработки RRQ: {str(e)}") - - def send_error(self, client_addr, error_code, error_message): - """ - Отправка сообщения об ошибке клиенту. - - :param client_addr: Адрес клиента. - :param error_code: Код ошибки. - :param error_message: Текст ошибки. - """ - # Формируем TFTP пакет ошибки: 2 байта опкода (5), 2 байта кода ошибки, сообщение об ошибке и завершающий 0. - packet = struct.pack("!HH", 5, error_code) + error_message.encode('utf-8') + b'\0' - self.server_socket.sendto(packet, client_addr) - self.log(f"[INFO] Отправлено сообщение об ошибке '{error_message}' клиенту {client_addr}") - - def send_file(self, file_path, client_addr): - """ - Передача файла клиенту по протоколу TFTP. - """ - BLOCK_SIZE = 512 - MAX_RETRIES = 5 - TIMEOUT = 2.0 - transfer_socket = None - try: - if not os.path.exists(file_path): - self.log(f"[ERROR] Файл '{file_path}' не существует") - self.send_error(client_addr, 1, "Файл не найден") - return - - filesize = os.path.getsize(file_path) - if filesize == 0: - self.log(f"[ERROR] Файл '{file_path}' пуст") - self.send_error(client_addr, 0, "Файл пуст") - return - - start_time = time.time() - file_basename = os.path.basename(file_path) - - # Регистрируем активную передачу - with self.lock: - self.active_transfers[client_addr] = { - 'filename': file_basename, - 'filesize': filesize, - 'bytes_sent': 0, - 'start_time': start_time - } - - # Создаем новый сокет для передачи данных - transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - transfer_socket.settimeout(TIMEOUT) - - with self.lock: - self.transfer_sockets.add(transfer_socket) - - self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.") - - with open(file_path, 'rb') as file: - block_number = 1 - last_successful_block = 0 - - while True: - # Читаем блок данных - data = file.read(BLOCK_SIZE) - - # Формируем и отправляем пакет данных - packet = struct.pack('!HH', 3, block_number) + data - - retries = 0 - while retries < MAX_RETRIES: - try: - transfer_socket.sendto(packet, client_addr) - - # Ожидаем подтверждение - while True: - try: - ack_data, ack_addr = transfer_socket.recvfrom(4) - if ack_addr == client_addr and len(ack_data) >= 4: - opcode, ack_block = struct.unpack('!HH', ack_data) - if opcode == 4: # ACK - if ack_block == block_number: - # Успешное подтверждение - last_successful_block = block_number - bytes_sent = min((block_number * BLOCK_SIZE), filesize) - - # Обновляем информацию о прогрессе - with self.lock: - if client_addr in self.active_transfers: - self.active_transfers[client_addr]['bytes_sent'] = bytes_sent - - # Логируем статус каждую секунду - current_time = time.time() - if current_time - start_time >= 1.0: - bytes_remaining = filesize - bytes_sent - elapsed_time = current_time - start_time - self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | " - f"Отправлено: {bytes_sent}/{filesize} байт | " - f"Осталось: {bytes_remaining} байт | " - f"Время: {elapsed_time:.2f} сек.") - - break - elif ack_block < block_number: - # Получен старый ACK, игнорируем - continue - except socket.timeout: - break - - if last_successful_block == block_number: - break - else: - retries += 1 - self.log(f"[WARN] Таймаут ожидания ACK для блока {block_number} от {client_addr}. " - f"Попытка {retries + 1}.") - except Exception as e: - retries += 1 - self.log(f"[ERROR] Ошибка при передаче блока {block_number}: {str(e)}") - - if retries >= MAX_RETRIES: - self.log(f"[ERROR] Превышено максимальное количество попыток для блока {block_number}") - return - - block_number += 1 - - # Если отправили меньше BLOCK_SIZE байт, это был последний блок - if len(data) < BLOCK_SIZE: - break - - self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} завершена успешно") - - except Exception as e: - self.log(f"[ERROR] Ошибка при передаче файла: {str(e)}") - finally: - # Очищаем информацию о передаче - with self.lock: - if client_addr in self.active_transfers: - del self.active_transfers[client_addr] - if transfer_socket in self.transfer_sockets: - self.transfer_sockets.remove(transfer_socket) - - if transfer_socket: - try: - transfer_socket.close() - except: - pass \ No newline at end of file diff --git a/about_window.py b/about_window.py deleted file mode 100644 index 1d3240e..0000000 --- a/about_window.py +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import tkinter as tk -from tkinter import ttk, BOTH, X, BOTTOM, END -import webbrowser - -class AboutWindow(tk.Toplevel): - def __init__(self, parent): - super().__init__(parent) - self.title("О программе") - self.geometry("600x500") - self.resizable(False, False) - - # Сохраняем ссылку на родительское окно - self.parent = parent - - # Создаем фрейм для содержимого - about_frame = ttk.Frame(self, padding="20") - about_frame.pack(fill=BOTH, expand=True, padx=10, pady=10) - - # Заголовок - ttk.Label( - about_frame, - text="ComConfigCopy", - font=("Segoe UI", 16, "bold") - ).pack(pady=(0, 10)) - - # Описание - ttk.Label( - about_frame, - text="Программа для копирования конфигураций на коммутаторы", - wraplength=350 - ).pack(pady=(0, 20)) - - # Версия - ttk.Label( - about_frame, - text=f"Версия {getattr(parent, 'VERSION', '1.0.0')}", - font=("Segoe UI", 10) - ).pack(pady=(0, 20)) - - # Контактная информация - contact_frame = ttk.Frame(about_frame) - contact_frame.pack(fill=X, pady=(0, 20)) - - # Исходный код - ttk.Label( - contact_frame, - text="Исходный код:", - font=("Segoe UI", 10, "bold") - ).pack(anchor="w") - - source_link = ttk.Label( - contact_frame, - text="https://gitea.filow.ru/LowaSC/ComConfigCopy", - cursor="hand2", - foreground="blue" - ) - source_link.pack(anchor="w") - source_link.bind("", lambda e: self.open_url("https://gitea.filow.ru/LowaSC/ComConfigCopy")) - - # Контакты - ttk.Label( - contact_frame, - text="\nКонтакты:", - font=("Segoe UI", 10, "bold") - ).pack(anchor="w") - - ttk.Label( - contact_frame, - text="Email: LowaWorkMail@gmail.com" - ).pack(anchor="w") - - telegram_link = ttk.Label( - contact_frame, - text="Telegram: @LowaSC", - cursor="hand2", - foreground="blue" - ) - telegram_link.pack(anchor="w") - telegram_link.bind("", lambda e: self.open_url("https://t.me/LowaSC")) - - # Кнопка закрытия - ttk.Button( - self, - text="Закрыть", - command=self.destroy - ).pack(side=BOTTOM, pady=10) - - # Центрируем окно - self.center_window() - - def center_window(self): - self.update_idletasks() - width = self.winfo_width() - height = self.winfo_height() - x = (self.winfo_screenwidth() // 2) - (width // 2) - y = (self.winfo_screenheight() // 2) - (height // 2) - self.geometry(f"{width}x{height}+{x}+{y}") - - def open_url(self, url): - webbrowser.open(url) \ No newline at end of file diff --git a/update_checker.py b/update_checker.py deleted file mode 100644 index 137bf09..0000000 --- a/update_checker.py +++ /dev/null @@ -1,175 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import json -import logging -import requests -import threading -from packaging import version - -class UpdateCheckError(Exception): - """Исключение для ошибок проверки обновлений""" - pass - -class UpdateChecker: - """Класс для проверки обновлений программы""" - - def __init__(self, current_version, repo_url): - self.current_version = current_version - self.repo_url = repo_url - # Формируем базовый URL API - self.api_url = repo_url.replace("gitea.filow.ru", "gitea.filow.ru/api/v1/repos/LowaSC/ComConfigCopy") - self._update_available = False - self._latest_version = None - self._latest_release = None - self._error = None - self._changelog = None - - def get_changelog(self, callback=None): - """ - Получение changelog из репозитория. - :param callback: Функция обратного вызова, которая будет вызвана после получения changelog - """ - def fetch(): - try: - # Пытаемся получить CHANGELOG.md из репозитория - response = requests.get(f"{self.api_url}/contents/CHANGELOG.md", timeout=10) - response.raise_for_status() - - content = response.json() - if "content" in content: - import base64 - changelog_content = base64.b64decode(content["content"]).decode("utf-8") - self._changelog = changelog_content - self._error = None - else: - raise UpdateCheckError("Не удалось получить содержимое CHANGELOG.md") - - except requests.RequestException as e: - error_msg = f"Ошибка получения changelog: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - self._changelog = None - except Exception as e: - error_msg = f"Неизвестная ошибка при получении changelog: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - self._changelog = None - finally: - if callback: - callback(self._changelog, self._error) - - # Запускаем получение в отдельном потоке - threading.Thread(target=fetch, daemon=True).start() - - def check_updates(self, callback=None): - """ - Проверка наличия обновлений. - :param callback: Функция обратного вызова, которая будет вызвана после проверки - """ - def check(): - try: - response = requests.get(f"{self.api_url}/releases", timeout=10) - response.raise_for_status() - - releases = response.json() - if not releases: - raise UpdateCheckError("Не найдено релизов в репозитории") - - latest_release = releases[0] - latest_version = latest_release.get("tag_name", "").lstrip("v") - - if not latest_version: - raise UpdateCheckError("Не удалось определить версию последнего релиза") - - try: - if version.parse(latest_version) > version.parse(self.current_version): - self._update_available = True - self._latest_version = latest_version - self._latest_release = latest_release - logging.info(f"Доступно обновление: {latest_version}") - else: - logging.info("Обновления не требуются") - except version.InvalidVersion as e: - raise UpdateCheckError(f"Некорректный формат версии: {e}") - - self._error = None - - except requests.RequestException as e: - error_msg = f"Ошибка сетевого подключения: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - except UpdateCheckError as e: - logging.error(str(e), exc_info=True) - self._error = str(e) - except Exception as e: - error_msg = f"Неизвестная ошибка при проверке обновлений: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - finally: - if callback: - callback(self._update_available, self._error) - - @property - def update_available(self): - """Доступно ли обновление""" - return self._update_available - - @property - def latest_version(self): - """Последняя доступная версия""" - return self._latest_version - - @property - def error(self): - """Последняя ошибка при проверке обновлений""" - return self._error - - @property - def changelog(self): - """Текущий changelog""" - return self._changelog - - def get_release_notes(self): - """Получение информации о последнем релизе""" - if self._latest_release: - return { - "version": self._latest_version, - "description": self._latest_release.get("body", ""), - "download_url": self._latest_release.get("assets", [{}])[0].get("browser_download_url", "") - } - return None - - def get_releases(self, callback=None): - """ - Получение списка релизов из репозитория. - :param callback: Функция обратного вызова, которая будет вызвана после получения списка релизов - """ - def fetch(): - try: - response = requests.get(f"{self.api_url}/releases", timeout=10) - response.raise_for_status() - releases = response.json() - - if not releases: - raise UpdateCheckError("Не найдено релизов в репозитории") - - self._error = None - if callback: - callback(releases, None) - - except requests.RequestException as e: - error_msg = f"Ошибка сетевого подключения: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - if callback: - callback(None, error_msg) - except Exception as e: - error_msg = f"Ошибка при получении списка релизов: {e}" - logging.error(error_msg, exc_info=True) - self._error = error_msg - if callback: - callback(None, error_msg) - - # Запускаем получение в отдельном потоке - threading.Thread(target=fetch, daemon=True).start() \ No newline at end of file