Первый коммит, тест <3
This commit is contained in:
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
app.log
|
||||
Settings/settings.json
|
||||
Configs/Eltex MES2424 AC - Сеть FTTB 2G, доп.txt
|
||||
Configs/конфиг доп 3750-52 с айпи 172.17.141.133 .txt
|
||||
DALL·E 2024-12-29 01.01.02 - Square vector logo_ A clean and minimalistic app icon for serial port management software. The design prominently features a simplified rectangular CO.ico
|
||||
test.py
|
||||
874
ComConfigCopy.py
Normal file
874
ComConfigCopy.py
Normal file
@@ -0,0 +1,874 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
# import platform Использовался для получения списка сетевых адаптеров
|
||||
import re
|
||||
import socket
|
||||
# import subprocess Использовался для получения списка сетевых адаптеров
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from getpass import getpass
|
||||
from logging.handlers import RotatingFileHandler
|
||||
import tkinter as tk
|
||||
from tkinter import (
|
||||
Tk,
|
||||
Frame,
|
||||
StringVar,
|
||||
END,
|
||||
BOTH,
|
||||
LEFT,
|
||||
X,
|
||||
W,
|
||||
filedialog,
|
||||
messagebox,
|
||||
simpledialog,
|
||||
)
|
||||
from tkinter import ttk
|
||||
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
from serial.serialutil import SerialException
|
||||
|
||||
# Создаем необходимые папки
|
||||
os.makedirs("Logs", exist_ok=True)
|
||||
os.makedirs("Configs", exist_ok=True)
|
||||
os.makedirs("Settings", exist_ok=True)
|
||||
os.makedirs("Firmware", exist_ok=True)
|
||||
|
||||
# Файл настроек находится в папке Settings
|
||||
SETTINGS_FILE = os.path.join("Settings", "settings.json")
|
||||
|
||||
# ==========================
|
||||
# Функции работы с настройками и логированием
|
||||
# ==========================
|
||||
|
||||
def setup_logging():
|
||||
"""Настройка логирования с использованием RotatingFileHandler."""
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.DEBUG)
|
||||
log_path = os.path.join("Logs", "app.log")
|
||||
handler = RotatingFileHandler(
|
||||
log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8"
|
||||
)
|
||||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
|
||||
def settings_load():
|
||||
"""Загрузка настроек из JSON-файла или создание настроек по умолчанию."""
|
||||
default_settings = {
|
||||
"port": None,
|
||||
"baudrate": 9600,
|
||||
"config_file": None,
|
||||
"login": None,
|
||||
"password": None,
|
||||
"timeout": 10,
|
||||
"copy_mode": "line", # 'line' или 'block'
|
||||
"block_size": 15,
|
||||
"prompt": ">", # используется для определения приглашения
|
||||
}
|
||||
if not os.path.exists(SETTINGS_FILE):
|
||||
try:
|
||||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(default_settings, f, indent=4, ensure_ascii=False)
|
||||
logging.info("Файл настроек создан с настройками по умолчанию.")
|
||||
except Exception as e:
|
||||
logging.error(f"Ошибка при создании файла настроек: {e}", exc_info=True)
|
||||
return default_settings
|
||||
|
||||
try:
|
||||
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
|
||||
settings = json.load(f)
|
||||
for key, value in default_settings.items():
|
||||
if key not in settings:
|
||||
settings[key] = value
|
||||
return settings
|
||||
except Exception as e:
|
||||
logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True)
|
||||
return default_settings
|
||||
|
||||
def settings_save(settings):
|
||||
"""Сохранение настроек в JSON-файл."""
|
||||
try:
|
||||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(settings, f, indent=4, ensure_ascii=False)
|
||||
logging.info("Настройки сохранены в файл.")
|
||||
except Exception as e:
|
||||
logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True)
|
||||
|
||||
def list_serial_ports():
|
||||
"""Получение списка доступных последовательных портов."""
|
||||
ports = serial.tools.list_ports.comports()
|
||||
logging.debug(f"Найдено {len(ports)} серийных портов.")
|
||||
return [port.device for port in ports]
|
||||
|
||||
# ==========================
|
||||
# Функции работы с сетевыми адаптерами (не используются)
|
||||
# ==========================
|
||||
|
||||
# def list_network_adapters():
|
||||
# """Возвращает список названий сетевых адаптеров (Windows)."""
|
||||
# adapters = []
|
||||
# if platform.system() == "Windows":
|
||||
# try:
|
||||
# output = subprocess.check_output(
|
||||
# 'wmic nic get NetConnectionID',
|
||||
# shell=True,
|
||||
# encoding="cp866"
|
||||
# )
|
||||
# for line in output.splitlines():
|
||||
# line = line.strip()
|
||||
# if line and line != "NetConnectionID":
|
||||
# adapters.append(line)
|
||||
# except Exception as e:
|
||||
# logging.error(f"Ошибка при получении списка адаптеров: {e}", exc_info=True)
|
||||
# else:
|
||||
# adapters = ["eth0"]
|
||||
# return adapters
|
||||
|
||||
# ==========================
|
||||
# Функции работы с COM-соединением
|
||||
# ==========================
|
||||
|
||||
def create_connection(settings):
|
||||
"""Создание соединения с устройством через последовательный порт."""
|
||||
try:
|
||||
conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1)
|
||||
logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.")
|
||||
time.sleep(1)
|
||||
return conn
|
||||
except SerialException as e:
|
||||
logging.error(f"Ошибка подключения: {e}", exc_info=True)
|
||||
except Exception as e:
|
||||
logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
def read_response(serial_connection, timeout, login=None, password=None, is_gui=False):
|
||||
"""
|
||||
Чтение ответа от устройства с учётом таймаута.
|
||||
Если в последней строке появляется запрос логина или пароля, данные отправляются автоматически.
|
||||
"""
|
||||
response = b""
|
||||
end_time = time.time() + timeout
|
||||
decoded = ""
|
||||
while time.time() < end_time:
|
||||
if serial_connection.in_waiting:
|
||||
chunk = serial_connection.read(serial_connection.in_waiting)
|
||||
response += chunk
|
||||
|
||||
if b"--More--" in response:
|
||||
serial_connection.write(b"\n")
|
||||
response = response.replace(b"--More--", b"")
|
||||
|
||||
try:
|
||||
decoded = response.decode(errors="ignore")
|
||||
except Exception:
|
||||
decoded = ""
|
||||
lines = decoded.rstrip().splitlines()
|
||||
if lines:
|
||||
last_line = lines[-1].strip()
|
||||
|
||||
if re.search(r'(login:|username:)$', last_line, re.IGNORECASE):
|
||||
if not login:
|
||||
if is_gui:
|
||||
login = simpledialog.askstring("Login", "Введите логин:")
|
||||
if login is None:
|
||||
login = ""
|
||||
else:
|
||||
login = input("Введите логин: ")
|
||||
serial_connection.write((login + "\n").encode())
|
||||
logging.info("Отправлен логин.")
|
||||
response = b""
|
||||
continue
|
||||
|
||||
if re.search(r'(password:)$', last_line, re.IGNORECASE):
|
||||
if not password:
|
||||
if is_gui:
|
||||
password = simpledialog.askstring("Password", "Введите пароль:", show="*")
|
||||
if password is None:
|
||||
password = ""
|
||||
else:
|
||||
password = getpass("Введите пароль: ")
|
||||
serial_connection.write((password + "\n").encode())
|
||||
logging.info("Отправлен пароль.")
|
||||
response = b""
|
||||
continue
|
||||
|
||||
if last_line.endswith(">") or last_line.endswith("#"):
|
||||
break
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
return decoded
|
||||
|
||||
def generate_command_blocks(lines, block_size):
|
||||
"""Генерация блоков команд для блочного копирования."""
|
||||
blocks = []
|
||||
current_block = []
|
||||
for line in lines:
|
||||
trimmed = line.strip()
|
||||
if not trimmed:
|
||||
continue
|
||||
lower_line = trimmed.lower()
|
||||
if lower_line.startswith("vlan") or lower_line.startswith("enable") or lower_line.startswith("interface"):
|
||||
if current_block:
|
||||
blocks.append("\n".join(current_block))
|
||||
current_block = []
|
||||
blocks.append(trimmed)
|
||||
elif lower_line.startswith("exit"):
|
||||
current_block.append(trimmed)
|
||||
blocks.append("\n".join(current_block))
|
||||
current_block = []
|
||||
else:
|
||||
current_block.append(trimmed)
|
||||
if len(current_block) >= block_size:
|
||||
blocks.append("\n".join(current_block))
|
||||
current_block = []
|
||||
if current_block:
|
||||
blocks.append("\n".join(current_block))
|
||||
return blocks
|
||||
|
||||
def execute_commands_from_file(
|
||||
serial_connection,
|
||||
filename,
|
||||
timeout,
|
||||
copy_mode,
|
||||
block_size,
|
||||
log_callback=None,
|
||||
login=None,
|
||||
password=None,
|
||||
is_gui=False,
|
||||
):
|
||||
"""
|
||||
Выполнение команд из файла конфигурации.
|
||||
Если передан log_callback, вывод будет отображаться в GUI.
|
||||
"""
|
||||
try:
|
||||
with open(filename, "r", encoding="utf-8") as file:
|
||||
lines = [line for line in file if line.strip()]
|
||||
msg = f"Выполнение команд из файла: {filename}\n"
|
||||
logging.info(msg)
|
||||
if log_callback:
|
||||
log_callback(msg)
|
||||
if copy_mode == "line":
|
||||
for cmd in lines:
|
||||
cmd = cmd.strip()
|
||||
msg = f"\nОтправка команды: {cmd}\n"
|
||||
if log_callback:
|
||||
log_callback(msg)
|
||||
serial_connection.write((cmd + "\n").encode())
|
||||
logging.info(f"Отправлена команда: {cmd}")
|
||||
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||||
if response:
|
||||
msg = f"Ответ устройства:\n{response}\n"
|
||||
if log_callback:
|
||||
log_callback(msg)
|
||||
logging.info(f"Ответ устройства:\n{response}")
|
||||
else:
|
||||
msg = f"Ответ не получен для команды: {cmd}\n"
|
||||
if log_callback:
|
||||
log_callback(msg)
|
||||
logging.warning(f"Нет ответа для команды: {cmd}")
|
||||
time.sleep(1)
|
||||
elif copy_mode == "block":
|
||||
blocks = generate_command_blocks(lines, block_size)
|
||||
for block in blocks:
|
||||
msg = f"\nОтправка блока команд:\n{block}\n"
|
||||
if log_callback:
|
||||
log_callback(msg)
|
||||
serial_connection.write((block + "\n").encode())
|
||||
logging.info(f"Отправлен блок команд:\n{block}")
|
||||
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||||
if response:
|
||||
msg = f"Ответ устройства:\n{response}\n"
|
||||
if log_callback:
|
||||
log_callback(msg)
|
||||
logging.info(f"Ответ устройства:\n{response}")
|
||||
else:
|
||||
msg = f"Ответ не получен для блока:\n{block}\n"
|
||||
if log_callback:
|
||||
log_callback(msg)
|
||||
logging.warning(f"Нет ответа для блока:\n{block}")
|
||||
time.sleep(1)
|
||||
except SerialException as e:
|
||||
msg = f"Ошибка при выполнении команды: {e}\n"
|
||||
if log_callback:
|
||||
log_callback(msg)
|
||||
logging.error(f"Ошибка при выполнении команды: {e}", exc_info=True)
|
||||
except Exception as e:
|
||||
msg = f"Ошибка при выполнении команд: {e}\n"
|
||||
if log_callback:
|
||||
log_callback(msg)
|
||||
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
|
||||
|
||||
# ==========================
|
||||
# Реализация TFTP-сервера для раздачи папки Firmware
|
||||
# ==========================
|
||||
|
||||
class TFTPServerThread(threading.Thread):
|
||||
def __init__(self, host, port, log_callback):
|
||||
super().__init__()
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.log_callback = log_callback
|
||||
self.running = True
|
||||
self.sock = None
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
self.sock.bind((self.host, self.port))
|
||||
self.log_callback(f"TFTP сервер запущен на {self.host}:{self.port}\n")
|
||||
except Exception as e:
|
||||
self.log_callback(f"Ошибка запуска TFTP сервера: {e}\n")
|
||||
return
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
self.sock.settimeout(1)
|
||||
data, addr = self.sock.recvfrom(1024)
|
||||
if data:
|
||||
self.handle_rrq(data, addr)
|
||||
except socket.timeout:
|
||||
continue
|
||||
except Exception as e:
|
||||
self.log_callback(f"Ошибка: {e}\n")
|
||||
if self.sock:
|
||||
self.sock.close()
|
||||
self.log_callback("TFTP сервер остановлен.\n")
|
||||
|
||||
def handle_rrq(self, data, addr):
|
||||
opcode = int.from_bytes(data[0:2], byteorder='big')
|
||||
if opcode != 1:
|
||||
self.log_callback(f"Получен не RRQ запрос от {addr}\n")
|
||||
return
|
||||
parts = data[2:].split(b'\0')
|
||||
if len(parts) < 2:
|
||||
self.log_callback(f"Неверный формат RRQ от {addr}\n")
|
||||
return
|
||||
req_filename = parts[0].decode('utf-8', errors='ignore')
|
||||
mode = parts[1].decode('utf-8', errors='ignore')
|
||||
self.log_callback(f"Получен RRQ для файла '{req_filename}' (режим {mode}) от {addr}\n")
|
||||
filepath = os.path.join("Firmware", req_filename)
|
||||
if not os.path.exists(filepath):
|
||||
self.log_callback(f"Файл '{req_filename}' не найден в папке Firmware\n")
|
||||
return
|
||||
try:
|
||||
filesize = os.path.getsize(filepath)
|
||||
f = open(filepath, 'rb')
|
||||
except Exception as e:
|
||||
self.log_callback(f"Ошибка открытия файла '{req_filename}': {e}\n")
|
||||
return
|
||||
block_num = 1
|
||||
bytes_sent = 0
|
||||
start_time = time.time()
|
||||
while True:
|
||||
block_data = f.read(512)
|
||||
data_packet = b'\x00\x03' + block_num.to_bytes(2, byteorder='big') + block_data
|
||||
self.sock.sendto(data_packet, addr)
|
||||
self.log_callback(f"Отправлен блок {block_num} ({len(block_data)} байт)\n")
|
||||
try:
|
||||
self.sock.settimeout(5)
|
||||
ack, ack_addr = self.sock.recvfrom(1024)
|
||||
if ack_addr != addr:
|
||||
continue
|
||||
ack_opcode = int.from_bytes(ack[0:2], byteorder='big')
|
||||
ack_block = int.from_bytes(ack[2:4], byteorder='big')
|
||||
if ack_opcode != 4 or ack_block != block_num:
|
||||
self.log_callback(f"Неверный ACK от {addr}\n")
|
||||
break
|
||||
except socket.timeout:
|
||||
self.log_callback("Таймаут ожидания ACK\n")
|
||||
break
|
||||
bytes_sent += len(block_data)
|
||||
elapsed = time.time() - start_time
|
||||
speed = bytes_sent / elapsed if elapsed > 0 else 0
|
||||
progress = (bytes_sent / filesize) * 100 if filesize > 0 else 0
|
||||
self.log_callback(f"Прогресс: {progress:.2f}% | Скорость: {speed:.2f} байт/с\n")
|
||||
if len(block_data) < 512:
|
||||
break
|
||||
block_num += 1
|
||||
f.close()
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
|
||||
# ==========================
|
||||
# Графический интерфейс (Tkinter) с улучшенной визуализацией
|
||||
# ==========================
|
||||
|
||||
class SerialAppGUI(tk.Tk):
|
||||
def __init__(self, settings):
|
||||
super().__init__()
|
||||
self.title("Serial Device Manager")
|
||||
self.geometry("900x700")
|
||||
# Настройка ttk стилей
|
||||
self.style = ttk.Style(self)
|
||||
self.style.theme_use("clam")
|
||||
default_font = ("Segoe UI", 10)
|
||||
self.option_add("*Font", default_font)
|
||||
self.settings = settings
|
||||
self.connection = None
|
||||
self.tftp_thread = None
|
||||
self.create_menu()
|
||||
self.create_tabs()
|
||||
|
||||
def create_menu(self):
|
||||
menubar = tk.Menu(self)
|
||||
self.config(menu=menubar)
|
||||
file_menu = tk.Menu(menubar, tearoff=0)
|
||||
file_menu.add_command(label="Выход", command=self.quit)
|
||||
menubar.add_cascade(label="Файл", menu=file_menu)
|
||||
|
||||
def create_tabs(self):
|
||||
notebook = ttk.Notebook(self)
|
||||
notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||||
|
||||
# Вкладка "Настройки"
|
||||
self.settings_frame = ttk.Frame(notebook, padding=10)
|
||||
notebook.add(self.settings_frame, text="Настройки")
|
||||
self.create_settings_tab(self.settings_frame)
|
||||
|
||||
# Вкладка "Интерактивный режим"
|
||||
self.interactive_frame = ttk.Frame(notebook, padding=10)
|
||||
notebook.add(self.interactive_frame, text="Интерактивный режим")
|
||||
self.create_interactive_tab(self.interactive_frame)
|
||||
|
||||
# Вкладка "Выполнить команды из файла"
|
||||
self.file_exec_frame = ttk.Frame(notebook, padding=10)
|
||||
notebook.add(self.file_exec_frame, text="Выполнить команды из файла")
|
||||
self.create_file_exec_tab(self.file_exec_frame)
|
||||
|
||||
# Вкладка "Редактор конфигурационного файла"
|
||||
self.config_editor_frame = ttk.Frame(notebook, padding=10)
|
||||
notebook.add(self.config_editor_frame, text="Редактор конфигурационного файла")
|
||||
self.create_config_editor_tab(self.config_editor_frame)
|
||||
|
||||
# Вкладка "TFTP Сервер"
|
||||
self.tftp_frame = ttk.Frame(notebook, padding=10)
|
||||
notebook.add(self.tftp_frame, text="TFTP Сервер")
|
||||
self.create_tftp_tab(self.tftp_frame)
|
||||
|
||||
# -------------- Вкладка "Настройки" --------------
|
||||
def create_settings_tab(self, frame):
|
||||
# Используем grid с отступами
|
||||
ttk.Label(frame, text="COM-порт:").grid(row=0, column=0, sticky=tk.E, padx=5, pady=5)
|
||||
self.port_var = StringVar(value=self.settings.get("port") or "")
|
||||
self.port_combo = ttk.Combobox(frame, textvariable=self.port_var, values=list_serial_ports(), width=20)
|
||||
self.port_combo.grid(row=0, column=1, sticky=tk.W, padx=5, pady=5)
|
||||
ttk.Button(frame, text="Обновить", command=self.update_ports).grid(row=0, column=2, padx=5, pady=5)
|
||||
|
||||
ttk.Label(frame, text="Baudrate:").grid(row=1, column=0, sticky=tk.E, padx=5, pady=5)
|
||||
self.baud_var = StringVar(value=str(self.settings.get("baudrate")))
|
||||
ttk.Entry(frame, textvariable=self.baud_var, width=10).grid(row=1, column=1, sticky=tk.W, padx=5, pady=5)
|
||||
|
||||
ttk.Label(frame, text="Файл конфигурации:").grid(row=2, column=0, sticky=tk.E, padx=5, pady=5)
|
||||
self.config_var = StringVar(value=self.settings.get("config_file") or "")
|
||||
ttk.Entry(frame, textvariable=self.config_var, width=40).grid(row=2, column=1, sticky=tk.W, padx=5, pady=5)
|
||||
ttk.Button(frame, text="Выбрать", command=self.select_config_file).grid(row=2, column=2, padx=5, pady=5)
|
||||
|
||||
ttk.Label(frame, text="Логин:").grid(row=3, column=0, sticky=tk.E, padx=5, pady=5)
|
||||
self.login_var = StringVar(value=self.settings.get("login") or "")
|
||||
ttk.Entry(frame, textvariable=self.login_var, width=20).grid(row=3, column=1, sticky=tk.W, padx=5, pady=5)
|
||||
|
||||
ttk.Label(frame, text="Пароль:").grid(row=4, column=0, sticky=tk.E, padx=5, pady=5)
|
||||
self.password_var = StringVar(value=self.settings.get("password") or "")
|
||||
ttk.Entry(frame, textvariable=self.password_var, show="*", width=20).grid(row=4, column=1, sticky=tk.W, padx=5, pady=5)
|
||||
|
||||
ttk.Label(frame, text="Timeout (сек):").grid(row=5, column=0, sticky=tk.E, padx=5, pady=5)
|
||||
self.timeout_var = StringVar(value=str(self.settings.get("timeout")))
|
||||
ttk.Entry(frame, textvariable=self.timeout_var, width=10).grid(row=5, column=1, sticky=tk.W, padx=5, pady=5)
|
||||
|
||||
ttk.Label(frame, text="Режим копирования:").grid(row=6, column=0, sticky=tk.E, padx=5, pady=5)
|
||||
self.copy_mode_var = StringVar(value=self.settings.get("copy_mode"))
|
||||
ttk.Radiobutton(frame, text="Построчно", variable=self.copy_mode_var, value="line").grid(row=6, column=1, sticky=tk.W, padx=5)
|
||||
ttk.Radiobutton(frame, text="Блочно", variable=self.copy_mode_var, value="block").grid(row=6, column=1, padx=100, sticky=tk.W)
|
||||
|
||||
ttk.Label(frame, text="Размер блока (строк):").grid(row=7, column=0, sticky=tk.E, padx=5, pady=5)
|
||||
self.block_size_var = StringVar(value=str(self.settings.get("block_size")))
|
||||
ttk.Entry(frame, textvariable=self.block_size_var, width=10).grid(row=7, column=1, sticky=tk.W, padx=5, pady=5)
|
||||
|
||||
ttk.Label(frame, text="Паттерн приглашения:").grid(row=8, column=0, sticky=tk.E, padx=5, pady=5)
|
||||
self.prompt_var = StringVar(value=self.settings.get("prompt"))
|
||||
ttk.Entry(frame, textvariable=self.prompt_var, width=20).grid(row=8, column=1, sticky=tk.W, padx=5, pady=5)
|
||||
|
||||
ttk.Button(frame, text="Сохранить настройки", command=self.save_settings).grid(row=9, column=0, padx=5, pady=10)
|
||||
ttk.Button(frame, text="Проверить соединение", command=self.check_connection).grid(row=9, column=1, padx=5, pady=10)
|
||||
|
||||
def update_ports(self):
|
||||
ports = list_serial_ports()
|
||||
self.port_combo['values'] = ports
|
||||
messagebox.showinfo("Информация", "Список портов обновлен.")
|
||||
|
||||
def select_config_file(self):
|
||||
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
|
||||
if filename:
|
||||
self.config_var.set(filename)
|
||||
|
||||
def save_settings(self):
|
||||
self.settings["port"] = self.port_var.get()
|
||||
try:
|
||||
self.settings["baudrate"] = int(self.baud_var.get())
|
||||
except ValueError:
|
||||
messagebox.showerror("Ошибка", "Некорректное значение baudrate!")
|
||||
return
|
||||
self.settings["config_file"] = self.config_var.get()
|
||||
self.settings["login"] = self.login_var.get()
|
||||
self.settings["password"] = self.password_var.get()
|
||||
try:
|
||||
self.settings["timeout"] = int(self.timeout_var.get())
|
||||
except ValueError:
|
||||
messagebox.showerror("Ошибка", "Некорректное значение timeout!")
|
||||
return
|
||||
self.settings["copy_mode"] = self.copy_mode_var.get()
|
||||
try:
|
||||
self.settings["block_size"] = int(self.block_size_var.get())
|
||||
except ValueError:
|
||||
messagebox.showerror("Ошибка", "Некорректное значение размера блока!")
|
||||
return
|
||||
self.settings["prompt"] = self.prompt_var.get()
|
||||
settings_save(self.settings)
|
||||
messagebox.showinfo("Информация", "Настройки сохранены.")
|
||||
|
||||
def check_connection(self):
|
||||
if not self.settings.get("port"):
|
||||
messagebox.showerror("Ошибка", "COM-порт не выбран!")
|
||||
return
|
||||
conn = create_connection(self.settings)
|
||||
if conn:
|
||||
messagebox.showinfo("Информация", "Соединение установлено успешно!")
|
||||
conn.close()
|
||||
else:
|
||||
messagebox.showerror("Ошибка", "Не удалось установить соединение.")
|
||||
|
||||
# -------------- Вкладка "Интерактивный режим" --------------
|
||||
def create_interactive_tab(self, frame):
|
||||
control_frame = ttk.Frame(frame)
|
||||
control_frame.pack(fill=X, pady=5)
|
||||
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
|
||||
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
|
||||
|
||||
self.interactive_text = tk.Text(frame, wrap="word", height=20)
|
||||
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||||
|
||||
input_frame = ttk.Frame(frame)
|
||||
input_frame.pack(fill=X, pady=5)
|
||||
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
|
||||
self.command_entry = ttk.Entry(input_frame, width=50)
|
||||
self.command_entry.pack(side=LEFT, padx=5)
|
||||
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
|
||||
|
||||
def connect_device(self):
|
||||
if self.connection:
|
||||
messagebox.showinfo("Информация", "Уже подключено.")
|
||||
return
|
||||
if not self.settings.get("port"):
|
||||
messagebox.showerror("Ошибка", "COM-порт не выбран!")
|
||||
return
|
||||
self.connection = create_connection(self.settings)
|
||||
if self.connection:
|
||||
self.append_interactive_text("[INFO] Подключение установлено.\n")
|
||||
else:
|
||||
self.append_interactive_text("[ERROR] Не удалось установить соединение.\n")
|
||||
|
||||
def disconnect_device(self):
|
||||
if self.connection:
|
||||
try:
|
||||
self.connection.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.connection = None
|
||||
self.append_interactive_text("[INFO] Соединение закрыто.\n")
|
||||
else:
|
||||
messagebox.showinfo("Информация", "Соединение не установлено.")
|
||||
|
||||
def send_command(self):
|
||||
if not self.connection:
|
||||
messagebox.showerror("Ошибка", "Сначала установите соединение!")
|
||||
return
|
||||
cmd = self.command_entry.get().strip()
|
||||
if not cmd:
|
||||
return
|
||||
self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n")
|
||||
threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start()
|
||||
|
||||
def process_command(self, cmd):
|
||||
try:
|
||||
self.connection.write((cmd + "\n").encode())
|
||||
logging.info(f"Отправлена команда: {cmd}")
|
||||
response = read_response(
|
||||
self.connection,
|
||||
self.settings.get("timeout", 10),
|
||||
login=self.settings.get("login"),
|
||||
password=self.settings.get("password"),
|
||||
is_gui=True,
|
||||
)
|
||||
if response:
|
||||
self.append_interactive_text(f"[INFO] Ответ устройства:\n{response}\n")
|
||||
logging.info(f"Получен ответ:\n{response}")
|
||||
else:
|
||||
self.append_interactive_text("[WARN] Ответ не получен.\n")
|
||||
logging.warning("Нет ответа от устройства в течение таймаута.")
|
||||
except SerialException as e:
|
||||
self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n")
|
||||
logging.error(f"Ошибка отправки команды: {e}", exc_info=True)
|
||||
except Exception as e:
|
||||
self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n")
|
||||
logging.error(f"Неизвестная ошибка: {e}", exc_info=True)
|
||||
|
||||
def append_interactive_text(self, text):
|
||||
self.interactive_text.insert(END, text)
|
||||
self.interactive_text.see(END)
|
||||
|
||||
# -------------- Вкладка "Выполнить команды из файла" --------------
|
||||
def create_file_exec_tab(self, frame):
|
||||
file_frame = ttk.Frame(frame)
|
||||
file_frame.pack(fill=X, pady=5)
|
||||
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
|
||||
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
|
||||
ttk.Entry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
|
||||
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
|
||||
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
|
||||
self.file_exec_text = tk.Text(frame, wrap="word", height=15)
|
||||
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||||
|
||||
def select_config_file_fileexec(self):
|
||||
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
|
||||
if filename:
|
||||
self.file_exec_var.set(filename)
|
||||
|
||||
def execute_file_commands(self):
|
||||
if not self.settings.get("port"):
|
||||
messagebox.showerror("Ошибка", "COM-порт не выбран!")
|
||||
return
|
||||
if not self.file_exec_var.get():
|
||||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран!")
|
||||
return
|
||||
if not self.connection:
|
||||
self.connection = create_connection(self.settings)
|
||||
if not self.connection:
|
||||
self.append_file_exec_text("[ERROR] Не удалось установить соединение.\n")
|
||||
return
|
||||
threading.Thread(
|
||||
target=execute_commands_from_file,
|
||||
args=(
|
||||
self.connection,
|
||||
self.file_exec_var.get(),
|
||||
self.settings.get("timeout", 10),
|
||||
self.settings.get("copy_mode", "line"),
|
||||
self.settings.get("block_size", 15),
|
||||
self.append_file_exec_text,
|
||||
self.settings.get("login"),
|
||||
self.settings.get("password"),
|
||||
True,
|
||||
),
|
||||
daemon=True,
|
||||
).start()
|
||||
|
||||
def append_file_exec_text(self, text):
|
||||
self.file_exec_text.insert(END, text)
|
||||
self.file_exec_text.see(END)
|
||||
|
||||
# -------------- Вкладка "Редактор конфигурационного файла" --------------
|
||||
def create_config_editor_tab(self, frame):
|
||||
top_frame = ttk.Frame(frame)
|
||||
top_frame.pack(fill=X, pady=5)
|
||||
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
|
||||
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
|
||||
ttk.Entry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
|
||||
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
|
||||
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
|
||||
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
|
||||
self.config_editor_text = tk.Text(frame, wrap="word")
|
||||
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||||
|
||||
def select_config_file_editor(self):
|
||||
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
|
||||
if filename:
|
||||
self.editor_file_var.set(filename)
|
||||
self.settings["config_file"] = filename
|
||||
settings_save(self.settings)
|
||||
|
||||
def load_config_file(self):
|
||||
filename = self.editor_file_var.get()
|
||||
if not filename or not os.path.exists(filename):
|
||||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран или не существует.")
|
||||
return
|
||||
try:
|
||||
with open(filename, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
self.config_editor_text.delete("1.0", END)
|
||||
self.config_editor_text.insert(END, content)
|
||||
messagebox.showinfo("Информация", "Файл загружен.")
|
||||
except Exception as e:
|
||||
logging.error(f"Ошибка загрузки файла: {e}", exc_info=True)
|
||||
messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}")
|
||||
|
||||
def save_config_file(self):
|
||||
filename = self.editor_file_var.get()
|
||||
if not filename:
|
||||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран.")
|
||||
return
|
||||
try:
|
||||
content = self.config_editor_text.get("1.0", END)
|
||||
with open(filename, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
messagebox.showinfo("Информация", "Файл сохранён.")
|
||||
except Exception as e:
|
||||
logging.error(f"Ошибка сохранения файла: {e}", exc_info=True)
|
||||
messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}")
|
||||
|
||||
# -------------- Вкладка "TFTP Сервер" --------------
|
||||
def create_tftp_tab(self, frame):
|
||||
ip_frame = ttk.Frame(frame)
|
||||
ip_frame.pack(fill=X, pady=2)
|
||||
ttk.Label(ip_frame, text="Слушать на IP:").pack(side=LEFT, padx=5)
|
||||
self.tftp_listen_ip_var = StringVar(value="0.0.0.0")
|
||||
ttk.Entry(ip_frame, textvariable=self.tftp_listen_ip_var, width=15).pack(side=LEFT, padx=5)
|
||||
|
||||
port_frame = ttk.Frame(frame)
|
||||
port_frame.pack(fill=X, pady=2)
|
||||
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
|
||||
self.tftp_port_var = StringVar(value="6969")
|
||||
ttk.Entry(port_frame, textvariable=self.tftp_port_var, width=10).pack(side=LEFT, padx=5)
|
||||
|
||||
folder_frame = ttk.Frame(frame)
|
||||
folder_frame.pack(fill=X, pady=2)
|
||||
ttk.Label(folder_frame, text="Папка прошивок:").pack(side=LEFT, padx=5)
|
||||
self.firmware_folder_var = StringVar(value="Firmware")
|
||||
ttk.Label(folder_frame, textvariable=self.firmware_folder_var).pack(side=LEFT, padx=5)
|
||||
ttk.Button(folder_frame, text="Открыть папку", command=lambda: os.startfile(os.path.abspath("Firmware"))).pack(side=LEFT, padx=5)
|
||||
|
||||
button_frame = ttk.Frame(frame)
|
||||
button_frame.pack(fill=X, pady=2)
|
||||
ttk.Button(button_frame, text="Запустить TFTP сервер", command=self.start_tftp_server).pack(side=LEFT, padx=5)
|
||||
ttk.Button(button_frame, text="Остановить TFTP сервер", command=self.stop_tftp_server).pack(side=LEFT, padx=5)
|
||||
|
||||
self.tftp_console = tk.Text(frame, wrap="word", height=15)
|
||||
self.tftp_console.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||||
|
||||
def append_tftp_console(self, text):
|
||||
self.tftp_console.insert(END, text)
|
||||
self.tftp_console.see(END)
|
||||
|
||||
def start_tftp_server(self):
|
||||
listen_ip = self.tftp_listen_ip_var.get()
|
||||
try:
|
||||
port = int(self.tftp_port_var.get())
|
||||
except ValueError:
|
||||
messagebox.showerror("Ошибка", "Некорректное значение порта.")
|
||||
return
|
||||
self.tftp_thread = TFTPServerThread(listen_ip, port, self.append_tftp_console)
|
||||
self.tftp_thread.start()
|
||||
|
||||
def stop_tftp_server(self):
|
||||
if self.tftp_thread:
|
||||
self.tftp_thread.stop()
|
||||
self.tftp_thread.join()
|
||||
self.tftp_thread = None
|
||||
self.append_tftp_console("TFTP сервер остановлен.\n")
|
||||
|
||||
# ==========================
|
||||
# Парсер аргументов (не используется)
|
||||
# ==========================
|
||||
# def parse_arguments():
|
||||
# parser = argparse.ArgumentParser(
|
||||
# description="Программа для работы с устройствами через последовательный порт с графическим интерфейсом."
|
||||
# )
|
||||
# parser.add_argument("--cli", action="store_true", help="Запустить в режиме командной строки (без графики)")
|
||||
# return parser.parse_args()
|
||||
|
||||
|
||||
|
||||
# ==========================
|
||||
# Режим командной строки (не используется)
|
||||
# ==========================
|
||||
|
||||
# def run_cli_mode(settings):
|
||||
# print("Запущен режим командной строки.")
|
||||
# while True:
|
||||
# print("\n--- Главное меню ---")
|
||||
# print("1. Подключиться и войти в интерактивный режим")
|
||||
# print("2. Выполнить команды из файла конфигурации")
|
||||
# print("0. Выход")
|
||||
# choice = input("Выберите действие: ").strip()
|
||||
# if choice == "1":
|
||||
# if not settings.get("port"):
|
||||
# print("[ERROR] COM-порт не выбран!")
|
||||
# continue
|
||||
# conn = create_connection(settings)
|
||||
# if not conn:
|
||||
# print("[ERROR] Не удалось установить соединение.")
|
||||
# continue
|
||||
# print("[INFO] Введите команды (для выхода введите _exit):")
|
||||
# while True:
|
||||
# cmd = input("Команда: ")
|
||||
# if cmd.strip().lower() == "_exit":
|
||||
# break
|
||||
# try:
|
||||
# conn.write((cmd + "\n").encode())
|
||||
# response = read_response(
|
||||
# conn, settings.get("timeout", 10),
|
||||
# login=settings.get("login"),
|
||||
# password=settings.get("password"),
|
||||
# is_gui=False
|
||||
# )
|
||||
# print(response)
|
||||
# except Exception as e:
|
||||
# print(f"[ERROR] {e}")
|
||||
# break
|
||||
# conn.close()
|
||||
# elif choice == "2":
|
||||
# if not settings.get("config_file"):
|
||||
# print("[ERROR] Файл конфигурации не выбран!")
|
||||
# continue
|
||||
# if not settings.get("port"):
|
||||
# print("[ERROR] COM-порт не выбран!")
|
||||
# continue
|
||||
# conn = create_connection(settings)
|
||||
# if conn:
|
||||
# execute_commands_from_file(
|
||||
# conn,
|
||||
# settings["config_file"],
|
||||
# settings.get("timeout", 10),
|
||||
# settings.get("copy_mode", "line"),
|
||||
# settings.get("block_size", 15),
|
||||
# lambda msg: print(msg),
|
||||
# login=settings.get("login"),
|
||||
# password=settings.get("password"),
|
||||
# is_gui=False,
|
||||
# )
|
||||
# conn.close()
|
||||
# else:
|
||||
# print("[ERROR] Не удалось установить соединение.")
|
||||
# elif choice == "0":
|
||||
# break
|
||||
# else:
|
||||
# print("[ERROR] Некорректный выбор.")
|
||||
|
||||
|
||||
# ==========================
|
||||
# Основной запуск приложения
|
||||
# ==========================
|
||||
def main():
|
||||
setup_logging()
|
||||
settings = settings_load()
|
||||
app = SerialAppGUI(settings)
|
||||
app.mainloop()
|
||||
|
||||
# ==========================
|
||||
# Основной запуск приложения
|
||||
# ==========================
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Программа прервана пользователем (KeyboardInterrupt).")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logging.critical(f"Неизвестная ошибка: {e}", exc_info=True)
|
||||
sys.exit(1)
|
||||
37
ComConfigCopy.pyproj
Normal file
37
ComConfigCopy.pyproj
Normal file
@@ -0,0 +1,37 @@
|
||||
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="4.0">
|
||||
<PropertyGroup>
|
||||
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
|
||||
<SchemaVersion>2.0</SchemaVersion>
|
||||
<ProjectGuid>5a5ba42b-743e-401d-ac33-9abfe8f095a1</ProjectGuid>
|
||||
<ProjectHome>.</ProjectHome>
|
||||
<StartupFile>ComConfigCopy.py</StartupFile>
|
||||
<SearchPath>
|
||||
</SearchPath>
|
||||
<WorkingDirectory>.</WorkingDirectory>
|
||||
<OutputPath>.</OutputPath>
|
||||
<Name>ComConfigCopy</Name>
|
||||
<RootNamespace>ComConfigCopy</RootNamespace>
|
||||
<TestFramework>Pytest</TestFramework>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
|
||||
<DebugSymbols>true</DebugSymbols>
|
||||
<EnableUnmanagedDebugging>false</EnableUnmanagedDebugging>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
|
||||
<DebugSymbols>true</DebugSymbols>
|
||||
<EnableUnmanagedDebugging>false</EnableUnmanagedDebugging>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<Compile Include="ComConfigCopy.py" />
|
||||
<Compile Include="test.py" />
|
||||
</ItemGroup>
|
||||
<Import Project="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Python Tools\Microsoft.PythonTools.targets" />
|
||||
<!-- Uncomment the CoreCompile target to enable the Build command in
|
||||
Visual Studio and specify your pre- and post-build commands in
|
||||
the BeforeBuild and AfterBuild targets below. -->
|
||||
<!--<Target Name="CoreCompile" />-->
|
||||
<Target Name="BeforeBuild">
|
||||
</Target>
|
||||
<Target Name="AfterBuild">
|
||||
</Target>
|
||||
</Project>
|
||||
BIN
output/ComConfigCopy.exe
Normal file
BIN
output/ComConfigCopy.exe
Normal file
Binary file not shown.
Reference in New Issue
Block a user