Files
ComConfigCopy/ComConfigCopy.py
Lowa 299ce329f7 Add TFTP server functionality to the application
- Implement TFTP server tab with IP and port configuration
- Create methods to start and stop TFTP server
- Add logging functionality for TFTP server events
- Integrate TFTPServer class into the main application
- Re-enable Firmware directory creation
2025-02-16 02:51:47 +03:00

1010 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ------------------------------------------------------------
# Это программа для копирования конфигураций на коммутаторы
# ------------------------------------------------------------
# import argparse Использовался для получения аргументов из командной строки (не используется)
# import platform Использовался для получения списка сетевых адаптеров (не используется)
# import subprocess Использовался для получения списка сетевых адаптеров (не используется)
# import socket не используется
import json
import logging
import os
import re
import sys
import threading
import time
from getpass import getpass
from logging.handlers import RotatingFileHandler
import tkinter as tk
from tkinter import (
StringVar,
END,
BOTH,
LEFT,
X,
W,
filedialog,
messagebox,
simpledialog,
)
from tkinter import ttk
import serial
import serial.tools.list_ports
from serial.serialutil import SerialException
from about_window import AboutWindow
from TFTPServer import TFTPServer
# from TFTPServer import TFTPServerThread
# Создаем необходимые папки
os.makedirs("Logs", exist_ok=True)
os.makedirs("Configs", exist_ok=True)
os.makedirs("Settings", exist_ok=True)
os.makedirs("Firmware", exist_ok=True)
# Файл настроек находится в папке Settings
SETTINGS_FILE = os.path.join("Settings", "settings.json")
# ==========================
# Функции работы с настройками и логированием
# ==========================
def setup_logging():
"""Настройка логирования с использованием RotatingFileHandler."""
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
log_path = os.path.join("Logs", "app.log")
handler = RotatingFileHandler(
log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8"
)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
def settings_load():
"""Загрузка настроек из JSON-файла или создание настроек по умолчанию."""
default_settings = {
"port": None, # Порт для подключения
"baudrate": 9600, # Скорость передачи данных
"config_file": None, # Файл конфигурации
"login": None, # Логин для подключения
"password": None, # Пароль для подключения
"timeout": 10, # Таймаут подключения
"copy_mode": "line", # Режим копирования
"block_size": 15, # Размер блока команд
"prompt": ">", # Используется для определения приглашения
}
# Создаем папку Settings, если её нет
os.makedirs("Settings", exist_ok=True)
if not os.path.exists(SETTINGS_FILE):
try:
# При первом запуске создаем новый файл настроек
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(default_settings, f, indent=4, ensure_ascii=False)
logging.info("Файл настроек создан с настройками по умолчанию.")
return default_settings
except Exception as e:
logging.error(f"Ошибка при создании файла настроек: {e}", exc_info=True)
return default_settings
try:
# Пытаемся загрузить существующие настройки
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
# Проверяем наличие всех необходимых параметров
settings_changed = False
for key, value in default_settings.items():
if key not in settings:
settings[key] = value
settings_changed = True
# Если были добавлены новые параметры, сохраняем обновленные настройки
if settings_changed:
try:
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(settings, f, indent=4, ensure_ascii=False)
logging.info("Файл настроек обновлен с новыми параметрами.")
except Exception as e:
logging.error(f"Ошибка при обновлении файла настроек: {e}", exc_info=True)
return settings
except Exception as e:
logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True)
return default_settings
def settings_save(settings):
"""Сохранение настроек в JSON-файл."""
try:
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(settings, f, indent=4, ensure_ascii=False)
logging.info("Настройки сохранены в файл.")
except Exception as e:
logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True)
def list_serial_ports():
"""Получение списка доступных последовательных портов."""
ports = serial.tools.list_ports.comports()
logging.debug(f"Найдено {len(ports)} серийных портов.")
return [port.device for port in ports]
# ==========================
# Функции работы с сетевыми адаптерами (не используются)
# ==========================
# def list_network_adapters():
# """Возвращает список названий сетевых адаптеров (Windows)."""
# adapters = []
# if platform.system() == "Windows":
# try:
# output = subprocess.check_output(
# 'wmic nic get NetConnectionID',
# shell=True,
# encoding="cp866"
# )
# for line in output.splitlines():
# line = line.strip()
# if line and line != "NetConnectionID":
# adapters.append(line)
# except Exception as e:
# logging.error(f"Ошибка при получении списка адаптеров: {e}", exc_info=True)
# else:
# adapters = ["eth0"]
# return adapters
# ==========================
# Функции работы с COM-соединением
# ==========================
def create_connection(settings):
"""Создание соединения с устройством через последовательный порт."""
try:
conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1)
logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.")
time.sleep(1)
return conn
except SerialException as e:
logging.error(f"Ошибка подключения: {e}", exc_info=True)
except Exception as e:
logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True)
return None
# Проверка наличия логина и пароля в настройках и отправка их на устройство
def send_login_password(serial_connection, login=None, password=None, is_gui=False):
"""Отправка логина и пароля на устройство."""
if not login:
if is_gui:
login = simpledialog.askstring("Login", "Введите логин:")
if login is None:
login = ""
else:
login = input("Введите логин: ")
if not password:
if is_gui:
password = simpledialog.askstring("Password", "Введите пароль:", show="*")
if password is None:
password = ""
else:
password = getpass("Введите пароль: ")
# Чтение ответа от устройства с учётом таймаута.
def read_response(serial_connection, timeout, login=None, password=None, is_gui=False):
"""
Чтение ответа от устройства с учётом таймаута.
Если в последней строке появляется запрос логина или пароля, данные отправляются автоматически.
"""
response = b""
end_time = time.time() + timeout
decoded = ""
while time.time() < end_time:
if serial_connection.in_waiting:
chunk = serial_connection.read(serial_connection.in_waiting)
response += chunk
if b"--More--" in response:
serial_connection.write(b"\n")
response = response.replace(b"--More--", b"")
try:
decoded = response.decode(errors="ignore")
except Exception:
decoded = ""
lines = decoded.rstrip().splitlines()
if lines:
last_line = lines[-1].strip()
if re.search(r'(login:|username:)$', last_line, re.IGNORECASE):
send_login_password(serial_connection, login, None, is_gui)
response = b""
continue
if re.search(r'(password:)$', last_line, re.IGNORECASE):
send_login_password(serial_connection, None, password, is_gui)
response = b""
continue
if last_line.endswith(">") or last_line.endswith("#"):
break
else:
time.sleep(0.1)
return decoded
def generate_command_blocks(lines, block_size):
"""Генерация блоков команд для блочного копирования."""
blocks = []
current_block = []
for line in lines:
trimmed = line.strip()
if not trimmed:
continue
lower_line = trimmed.lower()
if lower_line.startswith("vlan") or lower_line.startswith("enable") or lower_line.startswith("interface"):
if current_block:
blocks.append("\n".join(current_block))
current_block = []
blocks.append(trimmed)
elif lower_line.startswith("exit"):
current_block.append(trimmed)
blocks.append("\n".join(current_block))
current_block = []
else:
current_block.append(trimmed)
if len(current_block) >= block_size:
blocks.append("\n".join(current_block))
current_block = []
if current_block:
blocks.append("\n".join(current_block))
return blocks
def execute_commands_from_file(
serial_connection,
filename,
timeout,
copy_mode,
block_size,
log_callback=None,
login=None,
password=None,
is_gui=False,
):
"""
Выполнение команд из файла конфигурации.
Если передан log_callback, вывод будет отображаться в GUI.
Теперь при обнаружении ошибки (например, если в ответе присутствует символ '^')
команда будет отправляться повторно.
"""
try:
with open(filename, "r", encoding="utf-8") as file:
lines = [line for line in file if line.strip()]
msg = f"Выполнение команд из файла: {filename}\n"
logging.info(msg)
if log_callback:
log_callback(msg)
# Если выбран построчный режим
if copy_mode == "line":
for cmd in lines:
cmd = cmd.strip()
max_attempts = 3
attempt = 0
while attempt < max_attempts:
msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
if log_callback:
log_callback(msg)
serial_connection.write((cmd + "\n").encode())
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
if response:
if '^' in response:
msg = (
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{response}\n"
f"Повторная отправка команды...\n"
)
if log_callback:
log_callback(msg)
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
attempt += 1
time.sleep(1)
continue
else:
msg = f"Ответ устройства:\n{response}\n"
if log_callback:
log_callback(msg)
logging.info(f"Ответ устройства:\n{response}")
break
else:
msg = f"Ответ не получен для команды: {cmd}\n"
if log_callback:
log_callback(msg)
logging.warning(f"Нет ответа для команды: {cmd}")
break
if attempt == max_attempts:
msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
if log_callback:
log_callback(msg)
logging.error(msg)
time.sleep(1)
# Если выбран блочный режим
elif copy_mode == "block":
blocks = generate_command_blocks(lines, block_size)
for block in blocks:
msg = f"\nОтправка блока команд:\n{block}\n"
if log_callback:
log_callback(msg)
serial_connection.write((block + "\n").encode())
logging.info(f"Отправлен блок команд:\n{block}")
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
# Если обнаружена ошибка в ответе на блок, отправляем команды по очереди
if response and '^' in response:
msg = (
f"[WARNING] Обнаружена ошибка при выполнении блока команд.\n"
f"Ответ устройства:\n{response}\n"
f"Пересылаются команды по отдельности...\n"
)
if log_callback:
log_callback(msg)
logging.warning("Ошибка в блочном режиме отправляем команды индивидуально.")
for line in block.splitlines():
cmd = line.strip()
if not cmd:
continue
max_attempts = 3
attempt = 0
while attempt < max_attempts:
sub_msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
if log_callback:
log_callback(sub_msg)
serial_connection.write((cmd + "\n").encode())
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
sub_response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
if sub_response:
if '^' in sub_response:
sub_msg = (
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{sub_response}\n"
f"Повторная отправка команды...\n"
)
if log_callback:
log_callback(sub_msg)
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
attempt += 1
time.sleep(1)
continue
else:
sub_msg = f"Ответ устройства:\n{sub_response}\n"
if log_callback:
log_callback(sub_msg)
logging.info(f"Ответ устройства:\n{sub_response}")
break
else:
sub_msg = f"Ответ не получен для команды: {cmd}\n"
if log_callback:
log_callback(sub_msg)
logging.warning(f"Нет ответа для команды: {cmd}")
break
if attempt == max_attempts:
sub_msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
if log_callback:
log_callback(sub_msg)
logging.error(sub_msg)
time.sleep(1)
else:
if response:
msg = f"Ответ устройства:\n{response}\n"
if log_callback:
log_callback(msg)
logging.info(f"Ответ устройства:\n{response}")
else:
msg = f"Ответ не получен для блока:\n{block}\n"
if log_callback:
log_callback(msg)
logging.warning(f"Нет ответа для блока:\n{block}")
time.sleep(1)
except SerialException as e:
msg = f"Ошибка при выполнении команды: {e}\n"
if log_callback:
log_callback(msg)
logging.error(f"Ошибка при выполнении команды: {e}", exc_info=True)
except Exception as e:
msg = f"Ошибка при выполнении команд: {e}\n"
if log_callback:
log_callback(msg)
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
# ==========================
# Графический интерфейс (Tkinter)
# ==========================
class SettingsWindow(tk.Toplevel):
def __init__(self, parent, settings, callback=None):
super().__init__(parent)
self.title("Настройки")
self.geometry("600x400")
self.settings = settings
self.callback = callback
self.resizable(False, False)
# Создаем фрейм для настроек
settings_frame = ttk.Frame(self, padding="10")
settings_frame.pack(fill=BOTH, expand=True)
# COM порт
ttk.Label(settings_frame, text="COM порт:").grid(row=0, column=0, sticky=W, pady=5)
self.port_var = StringVar(value=settings.get("port", ""))
self.port_combo = ttk.Combobox(settings_frame, textvariable=self.port_var)
self.port_combo.grid(row=0, column=1, sticky=W, pady=5)
ttk.Button(settings_frame, text="Обновить", command=self.update_ports).grid(row=0, column=2, padx=5)
# Скорость передачи
ttk.Label(settings_frame, text="Скорость:").grid(row=1, column=0, sticky=W, pady=5)
self.baudrate_var = StringVar(value=str(settings.get("baudrate", 9600)))
baudrate_combo = ttk.Combobox(settings_frame, textvariable=self.baudrate_var,
values=["9600", "19200", "38400", "57600", "115200"])
baudrate_combo.grid(row=1, column=1, sticky=W, pady=5)
# Таймаут
ttk.Label(settings_frame, text="Таймаут (сек):").grid(row=2, column=0, sticky=W, pady=5)
self.timeout_var = StringVar(value=str(settings.get("timeout", 10)))
timeout_entry = ttk.Entry(settings_frame, textvariable=self.timeout_var)
timeout_entry.grid(row=2, column=1, sticky=W, pady=5)
# Режим копирования
ttk.Label(settings_frame, text="Режим копирования:").grid(row=3, column=0, sticky=W, pady=5)
self.copy_mode_var = StringVar(value=settings.get("copy_mode", "line"))
copy_mode_frame = ttk.Frame(settings_frame)
copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5)
ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line",
variable=self.copy_mode_var).pack(side=LEFT)
ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block",
variable=self.copy_mode_var).pack(side=LEFT)
# Размер блока
ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5)
self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
block_size_entry = ttk.Entry(settings_frame, textvariable=self.block_size_var)
block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
# Приглашение командной строки
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
self.prompt_var = StringVar(value=settings.get("prompt", ">"))
prompt_entry = ttk.Entry(settings_frame, textvariable=self.prompt_var)
prompt_entry.grid(row=5, column=1, sticky=W, pady=5)
# Кнопки
button_frame = ttk.Frame(settings_frame)
button_frame.grid(row=6, column=0, columnspan=3, pady=20)
ttk.Button(button_frame, text="Сохранить", command=self.save_settings).pack(side=LEFT, padx=5)
ttk.Button(button_frame, text="Отмена", command=self.destroy).pack(side=LEFT, padx=5)
self.update_ports()
# Центрируем окно
self.center_window()
def center_window(self):
self.update_idletasks()
width = self.winfo_width()
height = self.winfo_height()
x = (self.winfo_screenwidth() // 2) - (width // 2)
y = (self.winfo_screenheight() // 2) - (height // 2)
self.geometry(f"{width}x{height}+{x}+{y}")
def update_ports(self):
ports = list_serial_ports()
self.port_combo["values"] = ports
if ports and not self.port_var.get():
self.port_var.set(ports[0])
def save_settings(self):
try:
self.settings.update({
"port": self.port_var.get(),
"baudrate": int(self.baudrate_var.get()),
"timeout": int(self.timeout_var.get()),
"copy_mode": self.copy_mode_var.get(),
"block_size": int(self.block_size_var.get()),
"prompt": self.prompt_var.get()
})
settings_save(self.settings)
if self.callback:
self.callback()
self.destroy()
messagebox.showinfo("Успех", "Настройки успешно сохранены")
except ValueError as e:
messagebox.showerror("Ошибка", "Проверьте правильность введенных значений")
class SerialAppGUI(tk.Tk):
def __init__(self, settings):
super().__init__()
self.title("Serial Device Manager")
self.geometry("900x700")
self.style = ttk.Style(self)
self.style.theme_use("clam")
default_font = ("Segoe UI", 10)
self.option_add("*Font", default_font)
self.settings = settings
self.connection = None
self.tftp_server = None
# Глобальные биндинги
self.bind_class("Text", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Text", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Text", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.bind_class("Entry", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Entry", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Entry", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.create_menu()
self.create_tabs()
# Проверка первого запуска
self.check_first_run()
def check_first_run(self):
"""Проверка первого запуска программы"""
show_settings = False
# Проверяем существование файла настроек
if not os.path.exists(SETTINGS_FILE):
show_settings = True
else:
# Проверяем содержимое файла настроек
try:
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
# Если порт не настроен, считаем это первым запуском
if settings.get("port") is None:
show_settings = True
except Exception:
# Если файл поврежден или не читается, тоже показываем настройки
show_settings = True
if show_settings:
# Создаем папку Settings, если её нет
os.makedirs("Settings", exist_ok=True)
response = messagebox.askyesno(
"Первый запуск",
"Это первый запуск программы. Хотите настроить параметры подключения сейчас?"
)
if response:
self.open_settings()
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
# Меню "Файл"
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Файл", menu=file_menu)
file_menu.add_command(label="Настройки", command=self.open_settings)
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="О программе", command=self.open_about)
def open_settings(self):
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
def on_settings_changed(self):
# Обновляем настройки в основном приложении
self.settings = settings_load()
def create_tabs(self):
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Создаем вкладки
interactive_frame = ttk.Frame(self.notebook)
file_exec_frame = ttk.Frame(self.notebook)
config_editor_frame = ttk.Frame(self.notebook)
tftp_frame = ttk.Frame(self.notebook)
self.notebook.add(interactive_frame, text="Интерактивный режим")
self.notebook.add(file_exec_frame, text="Выполнение файла")
self.notebook.add(config_editor_frame, text="Редактор конфигурации")
self.notebook.add(tftp_frame, text="TFTP Сервер")
self.create_interactive_tab(interactive_frame)
self.create_file_exec_tab(file_exec_frame)
self.create_config_editor_tab(config_editor_frame)
self.create_tftp_tab(tftp_frame)
# -------------- Вкладка "Интерактивный режим" --------------
def create_interactive_tab(self, frame):
control_frame = ttk.Frame(frame)
control_frame.pack(fill=X, pady=5)
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
self.interactive_text = tk.Text(frame, wrap="word", height=20)
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
input_frame = ttk.Frame(frame)
input_frame.pack(fill=X, pady=5)
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
self.command_entry = ttk.Entry(input_frame, width=50)
self.command_entry.pack(side=LEFT, padx=5)
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
def connect_device(self):
if self.connection:
messagebox.showinfo("Информация", "Уже подключено.")
return
if not self.settings.get("port"):
messagebox.showerror("Ошибка", "COM-порт не выбран!")
return
self.connection = create_connection(self.settings)
if self.connection:
self.append_interactive_text("[INFO] Подключение установлено.\n")
else:
self.append_interactive_text("[ERROR] Не удалось установить соединение.\n")
def disconnect_device(self):
if self.connection:
try:
self.connection.close()
except Exception:
pass
self.connection = None
self.append_interactive_text("[INFO] Соединение закрыто.\n")
else:
messagebox.showinfo("Информация", "Соединение не установлено.")
def send_command(self):
if not self.connection:
messagebox.showerror("Ошибка", "Сначала установите соединение!")
return
cmd = self.command_entry.get().strip()
if not cmd:
return
self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n")
threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start()
def process_command(self, cmd):
try:
max_attempts = 3
attempt = 0
while attempt < max_attempts:
self.connection.write((cmd + "\n").encode())
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
response = read_response(
self.connection,
self.settings.get("timeout", 10),
login=self.settings.get("login"),
password=self.settings.get("password"),
is_gui=True,
)
if response:
if '^' in response:
self.append_interactive_text(
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
f"Ответ устройства:\n{response}\n"
f"Повторная отправка команды...\n"
)
logging.warning(f"Ошибка в команде: {cmd}. Попытка повторной отправки.")
attempt += 1
time.sleep(1)
continue
else:
self.append_interactive_text(f"[INFO] Ответ устройства:\n{response}\n")
logging.info(f"Получен ответ:\n{response}")
break
else:
self.append_interactive_text("[WARN] Ответ не получен.\n")
logging.warning("Нет ответа от устройства в течение таймаута.")
break
if attempt == max_attempts:
self.append_interactive_text(f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n")
logging.error(f"Команда не выполнена корректно после {max_attempts} попыток: {cmd}")
except SerialException as e:
self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n")
logging.error(f"Ошибка отправки команды: {e}", exc_info=True)
except Exception as e:
self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n")
logging.error(f"Неизвестная ошибка: {e}", exc_info=True)
def append_interactive_text(self, text):
self.interactive_text.insert(END, text)
self.interactive_text.see(END)
# -------------- Вкладка "Выполнить команды из файла" --------------
def create_file_exec_tab(self, frame):
file_frame = ttk.Frame(frame)
file_frame.pack(fill=X, pady=5)
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
self.file_exec_text = tk.Text(frame, wrap="word", height=15)
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def select_config_file_fileexec(self):
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
if filename:
self.file_exec_var.set(filename)
def execute_file_commands(self):
if not self.settings.get("port"):
messagebox.showerror("Ошибка", "COM-порт не выбран!")
return
if not self.file_exec_var.get():
messagebox.showerror("Ошибка", "Файл конфигурации не выбран!")
return
if not self.connection:
self.connection = create_connection(self.settings)
if not self.connection:
self.append_file_exec_text("[ERROR] Не удалось установить соединение.\n")
return
threading.Thread(
target=execute_commands_from_file,
args=(
self.connection,
self.file_exec_var.get(),
self.settings.get("timeout", 10),
self.settings.get("copy_mode", "line"),
self.settings.get("block_size", 15),
self.append_file_exec_text,
self.settings.get("login"),
self.settings.get("password"),
True,
),
daemon=True,
).start()
def append_file_exec_text(self, text):
self.file_exec_text.insert(END, text)
self.file_exec_text.see(END)
# -------------- Вкладка "Редактор конфигурационного файла" --------------
def create_config_editor_tab(self, frame):
top_frame = ttk.Frame(frame)
top_frame.pack(fill=X, pady=5)
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
self.config_editor_text = tk.Text(frame, wrap="word")
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def select_config_file_editor(self):
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
if filename:
self.editor_file_var.set(filename)
self.settings["config_file"] = filename
settings_save(self.settings)
def load_config_file(self):
filename = self.editor_file_var.get()
if not filename or not os.path.exists(filename):
messagebox.showerror("Ошибка", "Файл конфигурации не выбран или не существует.")
return
try:
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
self.config_editor_text.delete("1.0", END)
self.config_editor_text.insert(END, content)
messagebox.showinfo("Информация", "Файл загружен.")
except Exception as e:
logging.error(f"Ошибка загрузки файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}")
def save_config_file(self):
filename = self.editor_file_var.get()
if not filename:
messagebox.showerror("Ошибка", "Файл конфигурации не выбран.")
return
try:
content = self.config_editor_text.get("1.0", END)
with open(filename, "w", encoding="utf-8") as f:
f.write(content)
messagebox.showinfo("Информация", "Файл сохранён.")
except Exception as e:
logging.error(f"Ошибка сохранения файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}")
def open_about(self):
about_window = AboutWindow(self)
about_window.transient(self)
about_window.grab_set()
def create_tftp_tab(self, frame):
# Создаем фрейм для управления TFTP сервером
control_frame = ttk.Frame(frame)
control_frame.pack(fill=X, pady=5)
# IP адрес
ip_frame = ttk.Frame(control_frame)
ip_frame.pack(fill=X, pady=5)
ttk.Label(ip_frame, text="IP адрес:").pack(side=LEFT, padx=5)
self.tftp_ip_var = StringVar(value="0.0.0.0")
ttk.Entry(ip_frame, textvariable=self.tftp_ip_var, width=15).pack(side=LEFT, padx=5)
# Порт
port_frame = ttk.Frame(control_frame)
port_frame.pack(fill=X, pady=5)
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
self.tftp_port_var = StringVar(value="69")
ttk.Entry(port_frame, textvariable=self.tftp_port_var, width=6).pack(side=LEFT, padx=5)
# Кнопки управления
button_frame = ttk.Frame(control_frame)
button_frame.pack(fill=X, pady=5)
self.start_tftp_button = ttk.Button(button_frame, text="Запустить сервер", command=self.start_tftp_server)
self.start_tftp_button.pack(side=LEFT, padx=5)
self.stop_tftp_button = ttk.Button(button_frame, text="Остановить сервер", command=self.stop_tftp_server, state="disabled")
self.stop_tftp_button.pack(side=LEFT, padx=5)
# Лог TFTP сервера
log_frame = ttk.Frame(frame)
log_frame.pack(fill=BOTH, expand=True, pady=5)
ttk.Label(log_frame, text="Лог сервера:").pack(anchor=W, padx=5)
self.tftp_log_text = tk.Text(log_frame, wrap="word", height=15)
self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def start_tftp_server(self):
try:
ip = self.tftp_ip_var.get()
port = int(self.tftp_port_var.get())
if not self.tftp_server:
self.tftp_server = TFTPServer("Firmware")
# Устанавливаем функцию обратного вызова для логирования
def log_callback(message):
self.tftp_log_text.insert(END, f"{message}\n")
self.tftp_log_text.see(END)
self.tftp_server.set_log_callback(log_callback)
threading.Thread(
target=self.run_tftp_server,
args=(ip, port),
daemon=True
).start()
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal")
except ValueError:
messagebox.showerror("Ошибка", "Некорректный порт")
except Exception as e:
messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}")
def run_tftp_server(self, ip, port):
try:
self.tftp_server.start_server(ip, port)
except Exception as e:
self.tftp_log_text.insert(END, f"[ERROR] Ошибка TFTP сервера: {str(e)}\n")
self.stop_tftp_server()
def stop_tftp_server(self):
if self.tftp_server:
try:
self.tftp_server.stop_server()
self.tftp_server = None
self.tftp_log_text.insert(END, "[INFO] TFTP сервер остановлен\n")
except Exception as e:
self.tftp_log_text.insert(END, f"[ERROR] Ошибка при остановке TFTP сервера: {str(e)}\n")
finally:
self.start_tftp_button.config(state="normal")
self.stop_tftp_button.config(state="disabled")
# ==========================
# Парсер аргументов (не используется)
# ==========================
# def parse_arguments():
# parser = argparse.ArgumentParser(
# description="Программа для работы с устройствами через последовательный порт с графическим интерфейсом."
# )
# parser.add_argument("--cli", action="store_true", help="Запустить в режиме командной строки (без графики)")
# return parser.parse_args()
# ==========================
# Режим командной строки (не используется)
# ==========================
# def run_cli_mode(settings):
# print("Запущен режим командной строки.")
# while True:
# print("\n--- Главное меню ---")
# print("1. Подключиться и войти в интерактивный режим")
# print("2. Выполнить команды из файла конфигурации")
# print("0. Выход")
# choice = input("Выберите действие: ").strip()
# if choice == "1":
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if not conn:
# print("[ERROR] Не удалось установить соединение.")
# continue
# print("[INFO] Введите команды (для выхода введите _exit):")
# while True:
# cmd = input("Команда: ")
# if cmd.strip().lower() == "_exit":
# break
# try:
# conn.write((cmd + "\n").encode())
# response = read_response(
# conn, settings.get("timeout", 10),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False
# )
# print(response)
# except Exception as e:
# print(f"[ERROR] {e}")
# break
# conn.close()
# elif choice == "2":
# if not settings.get("config_file"):
# print("[ERROR] Файл конфигурации не выбран!")
# continue
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if conn:
# execute_commands_from_file(
# conn,
# settings["config_file"],
# settings.get("timeout", 10),
# settings.get("copy_mode", "line"),
# settings.get("block_size", 15),
# lambda msg: print(msg),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False,
# )
# conn.close()
# else:
# print("[ERROR] Не удалось установить соединение.")
# elif choice == "0":
# break
# else:
# print("[ERROR] Некорректный выбор.")
# ==========================
# Основной запуск приложения
# ==========================
def main():
setup_logging()
settings = settings_load()
app = SerialAppGUI(settings)
app.mainloop()
# ==========================
# Основной запуск приложения
# ==========================
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("Программа прервана пользователем (KeyboardInterrupt).")
sys.exit(0)
except Exception as e:
logging.critical(f"Неизвестная ошибка: {e}", exc_info=True)
sys.exit(1)