885 lines
41 KiB
Python
885 lines
41 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
|
||
# Это программа для копирования конфигураций на коммутаторы
|
||
# Программа использует библиотеку PySerial для работы с последовательными портами
|
||
# Программа использует библиотеку PyQt5 для создания графического интерфейса
|
||
# Программа использует библиотеку PyQt5.QtWidgets для создания графического интерфейса
|
||
# Программа использует библиотеку PyQt5.QtCore для создания графического интерфейса
|
||
# Программа использует библиотеку PyQt5.QtGui для создания графического интерфейса
|
||
|
||
|
||
|
||
# import argparse Использовался для получения аргументов из командной строки
|
||
# import platform Использовался для получения списка сетевых адаптеров
|
||
# import subprocess Использовался для получения списка сетевых адаптеров
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import socket
|
||
import sys
|
||
import threading
|
||
import time
|
||
from getpass import getpass
|
||
from logging.handlers import RotatingFileHandler
|
||
import tkinter as tk
|
||
from tkinter import (
|
||
Tk,
|
||
Frame,
|
||
StringVar,
|
||
END,
|
||
BOTH,
|
||
LEFT,
|
||
X,
|
||
W,
|
||
filedialog,
|
||
messagebox,
|
||
simpledialog,
|
||
)
|
||
from tkinter import ttk
|
||
|
||
import serial
|
||
import serial.tools.list_ports
|
||
from serial.serialutil import SerialException
|
||
|
||
# Создаем необходимые папки
|
||
os.makedirs("Logs", exist_ok=True)
|
||
os.makedirs("Configs", exist_ok=True)
|
||
os.makedirs("Settings", exist_ok=True)
|
||
os.makedirs("Firmware", exist_ok=True)
|
||
|
||
# Файл настроек находится в папке Settings
|
||
SETTINGS_FILE = os.path.join("Settings", "settings.json")
|
||
|
||
# ==========================
|
||
# Функции работы с настройками и логированием
|
||
# ==========================
|
||
|
||
def setup_logging():
|
||
"""Настройка логирования с использованием RotatingFileHandler."""
|
||
logger = logging.getLogger()
|
||
logger.setLevel(logging.DEBUG)
|
||
log_path = os.path.join("Logs", "app.log")
|
||
handler = RotatingFileHandler(
|
||
log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8"
|
||
)
|
||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
||
handler.setFormatter(formatter)
|
||
logger.addHandler(handler)
|
||
|
||
def settings_load():
|
||
"""Загрузка настроек из JSON-файла или создание настроек по умолчанию."""
|
||
default_settings = {
|
||
"port": None, # Порт для подключения
|
||
"baudrate": 9600, # Скорость передачи данных
|
||
"config_file": None, # Файл конфигурации
|
||
"login": None, # Логин для подключения
|
||
"password": None, # Пароль для подключения
|
||
"timeout": 10, # Таймаут подключения
|
||
"copy_mode": "line", # Режим копирования
|
||
"block_size": 15, # Размер блока команд
|
||
"prompt": ">", # Используется для определения приглашения
|
||
}
|
||
if not os.path.exists(SETTINGS_FILE):
|
||
try:
|
||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(default_settings, f, indent=4, ensure_ascii=False)
|
||
logging.info("Файл настроек создан с настройками по умолчанию.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при создании файла настроек: {e}", exc_info=True)
|
||
return default_settings
|
||
|
||
try:
|
||
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
|
||
settings = json.load(f)
|
||
for key, value in default_settings.items():
|
||
if key not in settings:
|
||
settings[key] = value
|
||
return settings
|
||
except Exception as e:
|
||
logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True)
|
||
return default_settings
|
||
|
||
def settings_save(settings):
|
||
"""Сохранение настроек в JSON-файл."""
|
||
try:
|
||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(settings, f, indent=4, ensure_ascii=False)
|
||
logging.info("Настройки сохранены в файл.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True)
|
||
|
||
def list_serial_ports():
|
||
"""Получение списка доступных последовательных портов."""
|
||
ports = serial.tools.list_ports.comports()
|
||
logging.debug(f"Найдено {len(ports)} серийных портов.")
|
||
return [port.device for port in ports]
|
||
|
||
# ==========================
|
||
# Функции работы с сетевыми адаптерами (не используются)
|
||
# ==========================
|
||
|
||
# def list_network_adapters():
|
||
# """Возвращает список названий сетевых адаптеров (Windows)."""
|
||
# adapters = []
|
||
# if platform.system() == "Windows":
|
||
# try:
|
||
# output = subprocess.check_output(
|
||
# 'wmic nic get NetConnectionID',
|
||
# shell=True,
|
||
# encoding="cp866"
|
||
# )
|
||
# for line in output.splitlines():
|
||
# line = line.strip()
|
||
# if line and line != "NetConnectionID":
|
||
# adapters.append(line)
|
||
# except Exception as e:
|
||
# logging.error(f"Ошибка при получении списка адаптеров: {e}", exc_info=True)
|
||
# else:
|
||
# adapters = ["eth0"]
|
||
# return adapters
|
||
|
||
# ==========================
|
||
# Функции работы с COM-соединением
|
||
# ==========================
|
||
|
||
def create_connection(settings):
|
||
"""Создание соединения с устройством через последовательный порт."""
|
||
try:
|
||
conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1)
|
||
logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.")
|
||
time.sleep(1)
|
||
return conn
|
||
except SerialException as e:
|
||
logging.error(f"Ошибка подключения: {e}", exc_info=True)
|
||
except Exception as e:
|
||
logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True)
|
||
return None
|
||
|
||
def read_response(serial_connection, timeout, login=None, password=None, is_gui=False):
|
||
"""
|
||
Чтение ответа от устройства с учётом таймаута.
|
||
Если в последней строке появляется запрос логина или пароля, данные отправляются автоматически.
|
||
"""
|
||
response = b""
|
||
end_time = time.time() + timeout
|
||
decoded = ""
|
||
while time.time() < end_time:
|
||
if serial_connection.in_waiting:
|
||
chunk = serial_connection.read(serial_connection.in_waiting)
|
||
response += chunk
|
||
|
||
if b"--More--" in response:
|
||
serial_connection.write(b"\n")
|
||
response = response.replace(b"--More--", b"")
|
||
|
||
try:
|
||
decoded = response.decode(errors="ignore")
|
||
except Exception:
|
||
decoded = ""
|
||
lines = decoded.rstrip().splitlines()
|
||
if lines:
|
||
last_line = lines[-1].strip()
|
||
|
||
if re.search(r'(login:|username:)$', last_line, re.IGNORECASE):
|
||
if not login:
|
||
if is_gui:
|
||
login = simpledialog.askstring("Login", "Введите логин:")
|
||
if login is None:
|
||
login = ""
|
||
else:
|
||
login = input("Введите логин: ")
|
||
serial_connection.write((login + "\n").encode())
|
||
logging.info("Отправлен логин.")
|
||
response = b""
|
||
continue
|
||
|
||
if re.search(r'(password:)$', last_line, re.IGNORECASE):
|
||
if not password:
|
||
if is_gui:
|
||
password = simpledialog.askstring("Password", "Введите пароль:", show="*")
|
||
if password is None:
|
||
password = ""
|
||
else:
|
||
password = getpass("Введите пароль: ")
|
||
serial_connection.write((password + "\n").encode())
|
||
logging.info("Отправлен пароль.")
|
||
response = b""
|
||
continue
|
||
|
||
if last_line.endswith(">") or last_line.endswith("#"):
|
||
break
|
||
else:
|
||
time.sleep(0.1)
|
||
return decoded
|
||
|
||
def generate_command_blocks(lines, block_size):
|
||
"""Генерация блоков команд для блочного копирования."""
|
||
blocks = []
|
||
current_block = []
|
||
for line in lines:
|
||
trimmed = line.strip()
|
||
if not trimmed:
|
||
continue
|
||
lower_line = trimmed.lower()
|
||
if lower_line.startswith("vlan") or lower_line.startswith("enable") or lower_line.startswith("interface"):
|
||
if current_block:
|
||
blocks.append("\n".join(current_block))
|
||
current_block = []
|
||
blocks.append(trimmed)
|
||
elif lower_line.startswith("exit"):
|
||
current_block.append(trimmed)
|
||
blocks.append("\n".join(current_block))
|
||
current_block = []
|
||
else:
|
||
current_block.append(trimmed)
|
||
if len(current_block) >= block_size:
|
||
blocks.append("\n".join(current_block))
|
||
current_block = []
|
||
if current_block:
|
||
blocks.append("\n".join(current_block))
|
||
return blocks
|
||
|
||
def execute_commands_from_file(
|
||
serial_connection,
|
||
filename,
|
||
timeout,
|
||
copy_mode,
|
||
block_size,
|
||
log_callback=None,
|
||
login=None,
|
||
password=None,
|
||
is_gui=False,
|
||
):
|
||
"""
|
||
Выполнение команд из файла конфигурации.
|
||
Если передан log_callback, вывод будет отображаться в GUI.
|
||
"""
|
||
try:
|
||
with open(filename, "r", encoding="utf-8") as file:
|
||
lines = [line for line in file if line.strip()]
|
||
msg = f"Выполнение команд из файла: {filename}\n"
|
||
logging.info(msg)
|
||
if log_callback:
|
||
log_callback(msg)
|
||
if copy_mode == "line":
|
||
for cmd in lines:
|
||
cmd = cmd.strip()
|
||
msg = f"\nОтправка команды: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
serial_connection.write((cmd + "\n").encode())
|
||
logging.info(f"Отправлена команда: {cmd}")
|
||
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||
if response:
|
||
msg = f"Ответ устройства:\n{response}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.info(f"Ответ устройства:\n{response}")
|
||
else:
|
||
msg = f"Ответ не получен для команды: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning(f"Нет ответа для команды: {cmd}")
|
||
time.sleep(1)
|
||
elif copy_mode == "block":
|
||
blocks = generate_command_blocks(lines, block_size)
|
||
for block in blocks:
|
||
msg = f"\nОтправка блока команд:\n{block}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
serial_connection.write((block + "\n").encode())
|
||
logging.info(f"Отправлен блок команд:\n{block}")
|
||
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||
if response:
|
||
msg = f"Ответ устройства:\n{response}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.info(f"Ответ устройства:\n{response}")
|
||
else:
|
||
msg = f"Ответ не получен для блока:\n{block}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning(f"Нет ответа для блока:\n{block}")
|
||
time.sleep(1)
|
||
except SerialException as e:
|
||
msg = f"Ошибка при выполнении команды: {e}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.error(f"Ошибка при выполнении команды: {e}", exc_info=True)
|
||
except Exception as e:
|
||
msg = f"Ошибка при выполнении команд: {e}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
|
||
|
||
# ==========================
|
||
# Реализация TFTP-сервера для раздачи папки Firmware
|
||
# ==========================
|
||
|
||
class TFTPServerThread(threading.Thread):
|
||
def __init__(self, host, port, log_callback):
|
||
super().__init__()
|
||
self.host = host
|
||
self.port = port
|
||
self.log_callback = log_callback
|
||
self.running = True
|
||
self.sock = None
|
||
|
||
def run(self):
|
||
try:
|
||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
self.sock.bind((self.host, self.port))
|
||
self.log_callback(f"TFTP сервер запущен на {self.host}:{self.port}\n")
|
||
except Exception as e:
|
||
self.log_callback(f"Ошибка запуска TFTP сервера: {e}\n")
|
||
return
|
||
|
||
while self.running:
|
||
try:
|
||
self.sock.settimeout(1)
|
||
data, addr = self.sock.recvfrom(1024)
|
||
if data:
|
||
self.handle_rrq(data, addr)
|
||
except socket.timeout:
|
||
continue
|
||
except Exception as e:
|
||
self.log_callback(f"Ошибка: {e}\n")
|
||
if self.sock:
|
||
self.sock.close()
|
||
self.log_callback("TFTP сервер остановлен.\n")
|
||
|
||
def handle_rrq(self, data, addr):
|
||
opcode = int.from_bytes(data[0:2], byteorder='big')
|
||
if opcode != 1:
|
||
self.log_callback(f"Получен не RRQ запрос от {addr}\n")
|
||
return
|
||
parts = data[2:].split(b'\0')
|
||
if len(parts) < 2:
|
||
self.log_callback(f"Неверный формат RRQ от {addr}\n")
|
||
return
|
||
req_filename = parts[0].decode('utf-8', errors='ignore')
|
||
mode = parts[1].decode('utf-8', errors='ignore')
|
||
self.log_callback(f"Получен RRQ для файла '{req_filename}' (режим {mode}) от {addr}\n")
|
||
filepath = os.path.join("Firmware", req_filename)
|
||
if not os.path.exists(filepath):
|
||
self.log_callback(f"Файл '{req_filename}' не найден в папке Firmware\n")
|
||
return
|
||
try:
|
||
filesize = os.path.getsize(filepath)
|
||
f = open(filepath, 'rb')
|
||
except Exception as e:
|
||
self.log_callback(f"Ошибка открытия файла '{req_filename}': {e}\n")
|
||
return
|
||
block_num = 1
|
||
bytes_sent = 0
|
||
start_time = time.time()
|
||
while True:
|
||
block_data = f.read(512)
|
||
data_packet = b'\x00\x03' + block_num.to_bytes(2, byteorder='big') + block_data
|
||
self.sock.sendto(data_packet, addr)
|
||
self.log_callback(f"Отправлен блок {block_num} ({len(block_data)} байт)\n")
|
||
try:
|
||
self.sock.settimeout(5)
|
||
ack, ack_addr = self.sock.recvfrom(1024)
|
||
if ack_addr != addr:
|
||
continue
|
||
ack_opcode = int.from_bytes(ack[0:2], byteorder='big')
|
||
ack_block = int.from_bytes(ack[2:4], byteorder='big')
|
||
if ack_opcode != 4 or ack_block != block_num:
|
||
self.log_callback(f"Неверный ACK от {addr}\n")
|
||
break
|
||
except socket.timeout:
|
||
self.log_callback("Таймаут ожидания ACK\n")
|
||
break
|
||
bytes_sent += len(block_data)
|
||
elapsed = time.time() - start_time
|
||
speed = bytes_sent / elapsed if elapsed > 0 else 0
|
||
progress = (bytes_sent / filesize) * 100 if filesize > 0 else 0
|
||
self.log_callback(f"Прогресс: {progress:.2f}% | Скорость: {speed:.2f} байт/с\n")
|
||
if len(block_data) < 512:
|
||
break
|
||
block_num += 1
|
||
f.close()
|
||
|
||
def stop(self):
|
||
self.running = False
|
||
|
||
# ==========================
|
||
# Графический интерфейс (Tkinter) с улучшенной визуализацией
|
||
# ==========================
|
||
|
||
class SerialAppGUI(tk.Tk):
|
||
def __init__(self, settings):
|
||
super().__init__()
|
||
self.title("Serial Device Manager")
|
||
self.geometry("900x700")
|
||
# Настройка ttk стилей
|
||
self.style = ttk.Style(self)
|
||
self.style.theme_use("clam")
|
||
default_font = ("Segoe UI", 10)
|
||
self.option_add("*Font", default_font)
|
||
self.settings = settings
|
||
self.connection = None
|
||
self.tftp_thread = None
|
||
self.create_menu()
|
||
self.create_tabs()
|
||
|
||
def create_menu(self):
|
||
menubar = tk.Menu(self)
|
||
self.config(menu=menubar)
|
||
file_menu = tk.Menu(menubar, tearoff=0)
|
||
file_menu.add_command(label="Выход", command=self.quit)
|
||
menubar.add_cascade(label="Файл", menu=file_menu)
|
||
|
||
def create_tabs(self):
|
||
notebook = ttk.Notebook(self)
|
||
notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Вкладка "Настройки"
|
||
self.settings_frame = ttk.Frame(notebook, padding=10)
|
||
notebook.add(self.settings_frame, text="Настройки")
|
||
self.create_settings_tab(self.settings_frame)
|
||
|
||
# Вкладка "Интерактивный режим"
|
||
self.interactive_frame = ttk.Frame(notebook, padding=10)
|
||
notebook.add(self.interactive_frame, text="Интерактивный режим")
|
||
self.create_interactive_tab(self.interactive_frame)
|
||
|
||
# Вкладка "Выполнить команды из файла"
|
||
self.file_exec_frame = ttk.Frame(notebook, padding=10)
|
||
notebook.add(self.file_exec_frame, text="Выполнить команды из файла")
|
||
self.create_file_exec_tab(self.file_exec_frame)
|
||
|
||
# Вкладка "Редактор конфигурационного файла"
|
||
self.config_editor_frame = ttk.Frame(notebook, padding=10)
|
||
notebook.add(self.config_editor_frame, text="Редактор конфигурационного файла")
|
||
self.create_config_editor_tab(self.config_editor_frame)
|
||
|
||
# Вкладка "TFTP Сервер"
|
||
self.tftp_frame = ttk.Frame(notebook, padding=10)
|
||
notebook.add(self.tftp_frame, text="TFTP Сервер")
|
||
self.create_tftp_tab(self.tftp_frame)
|
||
|
||
# -------------- Вкладка "Настройки" --------------
|
||
def create_settings_tab(self, frame):
|
||
# Используем grid с отступами
|
||
ttk.Label(frame, text="COM-порт:").grid(row=0, column=0, sticky=tk.E, padx=5, pady=5)
|
||
self.port_var = StringVar(value=self.settings.get("port") or "")
|
||
self.port_combo = ttk.Combobox(frame, textvariable=self.port_var, values=list_serial_ports(), width=20)
|
||
self.port_combo.grid(row=0, column=1, sticky=tk.W, padx=5, pady=5)
|
||
ttk.Button(frame, text="Обновить", command=self.update_ports).grid(row=0, column=2, padx=5, pady=5)
|
||
|
||
ttk.Label(frame, text="Baudrate:").grid(row=1, column=0, sticky=tk.E, padx=5, pady=5)
|
||
self.baud_var = StringVar(value=str(self.settings.get("baudrate")))
|
||
ttk.Entry(frame, textvariable=self.baud_var, width=10).grid(row=1, column=1, sticky=tk.W, padx=5, pady=5)
|
||
|
||
ttk.Label(frame, text="Файл конфигурации:").grid(row=2, column=0, sticky=tk.E, padx=5, pady=5)
|
||
self.config_var = StringVar(value=self.settings.get("config_file") or "")
|
||
ttk.Entry(frame, textvariable=self.config_var, width=40).grid(row=2, column=1, sticky=tk.W, padx=5, pady=5)
|
||
ttk.Button(frame, text="Выбрать", command=self.select_config_file).grid(row=2, column=2, padx=5, pady=5)
|
||
|
||
ttk.Label(frame, text="Логин:").grid(row=3, column=0, sticky=tk.E, padx=5, pady=5)
|
||
self.login_var = StringVar(value=self.settings.get("login") or "")
|
||
ttk.Entry(frame, textvariable=self.login_var, width=20).grid(row=3, column=1, sticky=tk.W, padx=5, pady=5)
|
||
|
||
ttk.Label(frame, text="Пароль:").grid(row=4, column=0, sticky=tk.E, padx=5, pady=5)
|
||
self.password_var = StringVar(value=self.settings.get("password") or "")
|
||
ttk.Entry(frame, textvariable=self.password_var, show="*", width=20).grid(row=4, column=1, sticky=tk.W, padx=5, pady=5)
|
||
|
||
ttk.Label(frame, text="Timeout (сек):").grid(row=5, column=0, sticky=tk.E, padx=5, pady=5)
|
||
self.timeout_var = StringVar(value=str(self.settings.get("timeout")))
|
||
ttk.Entry(frame, textvariable=self.timeout_var, width=10).grid(row=5, column=1, sticky=tk.W, padx=5, pady=5)
|
||
|
||
ttk.Label(frame, text="Режим копирования:").grid(row=6, column=0, sticky=tk.E, padx=5, pady=5)
|
||
self.copy_mode_var = StringVar(value=self.settings.get("copy_mode"))
|
||
ttk.Radiobutton(frame, text="Построчно", variable=self.copy_mode_var, value="line").grid(row=6, column=1, sticky=tk.W, padx=5)
|
||
ttk.Radiobutton(frame, text="Блочно", variable=self.copy_mode_var, value="block").grid(row=6, column=1, padx=100, sticky=tk.W)
|
||
|
||
ttk.Label(frame, text="Размер блока (строк):").grid(row=7, column=0, sticky=tk.E, padx=5, pady=5)
|
||
self.block_size_var = StringVar(value=str(self.settings.get("block_size")))
|
||
ttk.Entry(frame, textvariable=self.block_size_var, width=10).grid(row=7, column=1, sticky=tk.W, padx=5, pady=5)
|
||
|
||
ttk.Label(frame, text="Паттерн приглашения:").grid(row=8, column=0, sticky=tk.E, padx=5, pady=5)
|
||
self.prompt_var = StringVar(value=self.settings.get("prompt"))
|
||
ttk.Entry(frame, textvariable=self.prompt_var, width=20).grid(row=8, column=1, sticky=tk.W, padx=5, pady=5)
|
||
|
||
ttk.Button(frame, text="Сохранить настройки", command=self.save_settings).grid(row=9, column=0, padx=5, pady=10)
|
||
ttk.Button(frame, text="Проверить соединение", command=self.check_connection).grid(row=9, column=1, padx=5, pady=10)
|
||
|
||
def update_ports(self):
|
||
ports = list_serial_ports()
|
||
self.port_combo['values'] = ports
|
||
messagebox.showinfo("Информация", "Список портов обновлен.")
|
||
|
||
def select_config_file(self):
|
||
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
|
||
if filename:
|
||
self.config_var.set(filename)
|
||
|
||
def save_settings(self):
|
||
self.settings["port"] = self.port_var.get()
|
||
try:
|
||
self.settings["baudrate"] = int(self.baud_var.get())
|
||
except ValueError:
|
||
messagebox.showerror("Ошибка", "Некорректное значение baudrate!")
|
||
return
|
||
self.settings["config_file"] = self.config_var.get()
|
||
self.settings["login"] = self.login_var.get()
|
||
self.settings["password"] = self.password_var.get()
|
||
try:
|
||
self.settings["timeout"] = int(self.timeout_var.get())
|
||
except ValueError:
|
||
messagebox.showerror("Ошибка", "Некорректное значение timeout!")
|
||
return
|
||
self.settings["copy_mode"] = self.copy_mode_var.get()
|
||
try:
|
||
self.settings["block_size"] = int(self.block_size_var.get())
|
||
except ValueError:
|
||
messagebox.showerror("Ошибка", "Некорректное значение размера блока!")
|
||
return
|
||
self.settings["prompt"] = self.prompt_var.get()
|
||
settings_save(self.settings)
|
||
messagebox.showinfo("Информация", "Настройки сохранены.")
|
||
|
||
def check_connection(self):
|
||
if not self.settings.get("port"):
|
||
messagebox.showerror("Ошибка", "COM-порт не выбран!")
|
||
return
|
||
conn = create_connection(self.settings)
|
||
if conn:
|
||
messagebox.showinfo("Информация", "Соединение установлено успешно!")
|
||
conn.close()
|
||
else:
|
||
messagebox.showerror("Ошибка", "Не удалось установить соединение.")
|
||
|
||
# -------------- Вкладка "Интерактивный режим" --------------
|
||
def create_interactive_tab(self, frame):
|
||
control_frame = ttk.Frame(frame)
|
||
control_frame.pack(fill=X, pady=5)
|
||
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
|
||
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
|
||
|
||
self.interactive_text = tk.Text(frame, wrap="word", height=20)
|
||
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
input_frame = ttk.Frame(frame)
|
||
input_frame.pack(fill=X, pady=5)
|
||
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
|
||
self.command_entry = ttk.Entry(input_frame, width=50)
|
||
self.command_entry.pack(side=LEFT, padx=5)
|
||
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
|
||
|
||
def connect_device(self):
|
||
if self.connection:
|
||
messagebox.showinfo("Информация", "Уже подключено.")
|
||
return
|
||
if not self.settings.get("port"):
|
||
messagebox.showerror("Ошибка", "COM-порт не выбран!")
|
||
return
|
||
self.connection = create_connection(self.settings)
|
||
if self.connection:
|
||
self.append_interactive_text("[INFO] Подключение установлено.\n")
|
||
else:
|
||
self.append_interactive_text("[ERROR] Не удалось установить соединение.\n")
|
||
|
||
def disconnect_device(self):
|
||
if self.connection:
|
||
try:
|
||
self.connection.close()
|
||
except Exception:
|
||
pass
|
||
self.connection = None
|
||
self.append_interactive_text("[INFO] Соединение закрыто.\n")
|
||
else:
|
||
messagebox.showinfo("Информация", "Соединение не установлено.")
|
||
|
||
def send_command(self):
|
||
if not self.connection:
|
||
messagebox.showerror("Ошибка", "Сначала установите соединение!")
|
||
return
|
||
cmd = self.command_entry.get().strip()
|
||
if not cmd:
|
||
return
|
||
self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n")
|
||
threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start()
|
||
|
||
def process_command(self, cmd):
|
||
try:
|
||
self.connection.write((cmd + "\n").encode())
|
||
logging.info(f"Отправлена команда: {cmd}")
|
||
response = read_response(
|
||
self.connection,
|
||
self.settings.get("timeout", 10),
|
||
login=self.settings.get("login"),
|
||
password=self.settings.get("password"),
|
||
is_gui=True,
|
||
)
|
||
if response:
|
||
self.append_interactive_text(f"[INFO] Ответ устройства:\n{response}\n")
|
||
logging.info(f"Получен ответ:\n{response}")
|
||
else:
|
||
self.append_interactive_text("[WARN] Ответ не получен.\n")
|
||
logging.warning("Нет ответа от устройства в течение таймаута.")
|
||
except SerialException as e:
|
||
self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n")
|
||
logging.error(f"Ошибка отправки команды: {e}", exc_info=True)
|
||
except Exception as e:
|
||
self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n")
|
||
logging.error(f"Неизвестная ошибка: {e}", exc_info=True)
|
||
|
||
def append_interactive_text(self, text):
|
||
self.interactive_text.insert(END, text)
|
||
self.interactive_text.see(END)
|
||
|
||
# -------------- Вкладка "Выполнить команды из файла" --------------
|
||
def create_file_exec_tab(self, frame):
|
||
file_frame = ttk.Frame(frame)
|
||
file_frame.pack(fill=X, pady=5)
|
||
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
|
||
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
|
||
ttk.Entry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
|
||
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
|
||
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
|
||
self.file_exec_text = tk.Text(frame, wrap="word", height=15)
|
||
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
def select_config_file_fileexec(self):
|
||
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
|
||
if filename:
|
||
self.file_exec_var.set(filename)
|
||
|
||
def execute_file_commands(self):
|
||
if not self.settings.get("port"):
|
||
messagebox.showerror("Ошибка", "COM-порт не выбран!")
|
||
return
|
||
if not self.file_exec_var.get():
|
||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран!")
|
||
return
|
||
if not self.connection:
|
||
self.connection = create_connection(self.settings)
|
||
if not self.connection:
|
||
self.append_file_exec_text("[ERROR] Не удалось установить соединение.\n")
|
||
return
|
||
threading.Thread(
|
||
target=execute_commands_from_file,
|
||
args=(
|
||
self.connection,
|
||
self.file_exec_var.get(),
|
||
self.settings.get("timeout", 10),
|
||
self.settings.get("copy_mode", "line"),
|
||
self.settings.get("block_size", 15),
|
||
self.append_file_exec_text,
|
||
self.settings.get("login"),
|
||
self.settings.get("password"),
|
||
True,
|
||
),
|
||
daemon=True,
|
||
).start()
|
||
|
||
def append_file_exec_text(self, text):
|
||
self.file_exec_text.insert(END, text)
|
||
self.file_exec_text.see(END)
|
||
|
||
# -------------- Вкладка "Редактор конфигурационного файла" --------------
|
||
def create_config_editor_tab(self, frame):
|
||
top_frame = ttk.Frame(frame)
|
||
top_frame.pack(fill=X, pady=5)
|
||
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
|
||
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
|
||
ttk.Entry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
|
||
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
|
||
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
|
||
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
|
||
self.config_editor_text = tk.Text(frame, wrap="word")
|
||
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
def select_config_file_editor(self):
|
||
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
|
||
if filename:
|
||
self.editor_file_var.set(filename)
|
||
self.settings["config_file"] = filename
|
||
settings_save(self.settings)
|
||
|
||
def load_config_file(self):
|
||
filename = self.editor_file_var.get()
|
||
if not filename or not os.path.exists(filename):
|
||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран или не существует.")
|
||
return
|
||
try:
|
||
with open(filename, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
self.config_editor_text.delete("1.0", END)
|
||
self.config_editor_text.insert(END, content)
|
||
messagebox.showinfo("Информация", "Файл загружен.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка загрузки файла: {e}", exc_info=True)
|
||
messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}")
|
||
|
||
def save_config_file(self):
|
||
filename = self.editor_file_var.get()
|
||
if not filename:
|
||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран.")
|
||
return
|
||
try:
|
||
content = self.config_editor_text.get("1.0", END)
|
||
with open(filename, "w", encoding="utf-8") as f:
|
||
f.write(content)
|
||
messagebox.showinfo("Информация", "Файл сохранён.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка сохранения файла: {e}", exc_info=True)
|
||
messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}")
|
||
|
||
# -------------- Вкладка "TFTP Сервер" --------------
|
||
def create_tftp_tab(self, frame):
|
||
ip_frame = ttk.Frame(frame)
|
||
ip_frame.pack(fill=X, pady=2)
|
||
ttk.Label(ip_frame, text="Слушать на IP:").pack(side=LEFT, padx=5)
|
||
self.tftp_listen_ip_var = StringVar(value="0.0.0.0")
|
||
ttk.Entry(ip_frame, textvariable=self.tftp_listen_ip_var, width=15).pack(side=LEFT, padx=5)
|
||
|
||
port_frame = ttk.Frame(frame)
|
||
port_frame.pack(fill=X, pady=2)
|
||
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
|
||
self.tftp_port_var = StringVar(value="6969")
|
||
ttk.Entry(port_frame, textvariable=self.tftp_port_var, width=10).pack(side=LEFT, padx=5)
|
||
|
||
folder_frame = ttk.Frame(frame)
|
||
folder_frame.pack(fill=X, pady=2)
|
||
ttk.Label(folder_frame, text="Папка прошивок:").pack(side=LEFT, padx=5)
|
||
self.firmware_folder_var = StringVar(value="Firmware")
|
||
ttk.Label(folder_frame, textvariable=self.firmware_folder_var).pack(side=LEFT, padx=5)
|
||
ttk.Button(folder_frame, text="Открыть папку", command=lambda: os.startfile(os.path.abspath("Firmware"))).pack(side=LEFT, padx=5)
|
||
|
||
button_frame = ttk.Frame(frame)
|
||
button_frame.pack(fill=X, pady=2)
|
||
ttk.Button(button_frame, text="Запустить TFTP сервер", command=self.start_tftp_server).pack(side=LEFT, padx=5)
|
||
ttk.Button(button_frame, text="Остановить TFTP сервер", command=self.stop_tftp_server).pack(side=LEFT, padx=5)
|
||
|
||
self.tftp_console = tk.Text(frame, wrap="word", height=15)
|
||
self.tftp_console.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
def append_tftp_console(self, text):
|
||
self.tftp_console.insert(END, text)
|
||
self.tftp_console.see(END)
|
||
|
||
def start_tftp_server(self):
|
||
listen_ip = self.tftp_listen_ip_var.get()
|
||
try:
|
||
port = int(self.tftp_port_var.get())
|
||
except ValueError:
|
||
messagebox.showerror("Ошибка", "Некорректное значение порта.")
|
||
return
|
||
self.tftp_thread = TFTPServerThread(listen_ip, port, self.append_tftp_console)
|
||
self.tftp_thread.start()
|
||
|
||
def stop_tftp_server(self):
|
||
if self.tftp_thread:
|
||
self.tftp_thread.stop()
|
||
self.tftp_thread.join()
|
||
self.tftp_thread = None
|
||
self.append_tftp_console("TFTP сервер остановлен.\n")
|
||
|
||
# ==========================
|
||
# Парсер аргументов (не используется)
|
||
# ==========================
|
||
# def parse_arguments():
|
||
# parser = argparse.ArgumentParser(
|
||
# description="Программа для работы с устройствами через последовательный порт с графическим интерфейсом."
|
||
# )
|
||
# parser.add_argument("--cli", action="store_true", help="Запустить в режиме командной строки (без графики)")
|
||
# return parser.parse_args()
|
||
|
||
|
||
|
||
# ==========================
|
||
# Режим командной строки (не используется)
|
||
# ==========================
|
||
|
||
# def run_cli_mode(settings):
|
||
# print("Запущен режим командной строки.")
|
||
# while True:
|
||
# print("\n--- Главное меню ---")
|
||
# print("1. Подключиться и войти в интерактивный режим")
|
||
# print("2. Выполнить команды из файла конфигурации")
|
||
# print("0. Выход")
|
||
# choice = input("Выберите действие: ").strip()
|
||
# if choice == "1":
|
||
# if not settings.get("port"):
|
||
# print("[ERROR] COM-порт не выбран!")
|
||
# continue
|
||
# conn = create_connection(settings)
|
||
# if not conn:
|
||
# print("[ERROR] Не удалось установить соединение.")
|
||
# continue
|
||
# print("[INFO] Введите команды (для выхода введите _exit):")
|
||
# while True:
|
||
# cmd = input("Команда: ")
|
||
# if cmd.strip().lower() == "_exit":
|
||
# break
|
||
# try:
|
||
# conn.write((cmd + "\n").encode())
|
||
# response = read_response(
|
||
# conn, settings.get("timeout", 10),
|
||
# login=settings.get("login"),
|
||
# password=settings.get("password"),
|
||
# is_gui=False
|
||
# )
|
||
# print(response)
|
||
# except Exception as e:
|
||
# print(f"[ERROR] {e}")
|
||
# break
|
||
# conn.close()
|
||
# elif choice == "2":
|
||
# if not settings.get("config_file"):
|
||
# print("[ERROR] Файл конфигурации не выбран!")
|
||
# continue
|
||
# if not settings.get("port"):
|
||
# print("[ERROR] COM-порт не выбран!")
|
||
# continue
|
||
# conn = create_connection(settings)
|
||
# if conn:
|
||
# execute_commands_from_file(
|
||
# conn,
|
||
# settings["config_file"],
|
||
# settings.get("timeout", 10),
|
||
# settings.get("copy_mode", "line"),
|
||
# settings.get("block_size", 15),
|
||
# lambda msg: print(msg),
|
||
# login=settings.get("login"),
|
||
# password=settings.get("password"),
|
||
# is_gui=False,
|
||
# )
|
||
# conn.close()
|
||
# else:
|
||
# print("[ERROR] Не удалось установить соединение.")
|
||
# elif choice == "0":
|
||
# break
|
||
# else:
|
||
# print("[ERROR] Некорректный выбор.")
|
||
|
||
|
||
# ==========================
|
||
# Основной запуск приложения
|
||
# ==========================
|
||
def main():
|
||
setup_logging()
|
||
settings = settings_load()
|
||
app = SerialAppGUI(settings)
|
||
app.mainloop()
|
||
|
||
# ==========================
|
||
# Основной запуск приложения
|
||
# ==========================
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except KeyboardInterrupt:
|
||
logging.info("Программа прервана пользователем (KeyboardInterrupt).")
|
||
sys.exit(0)
|
||
except Exception as e:
|
||
logging.critical(f"Неизвестная ошибка: {e}", exc_info=True)
|
||
sys.exit(1)
|