- Enhance server stop mechanism with more robust socket and thread management - Add better handling of active transfers during server shutdown - Implement additional safety checks and timeout handling - Improve logging and error reporting for server stop process - Prevent potential deadlocks and resource leaks during server termination
1143 lines
54 KiB
Python
1143 lines
54 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
# ------------------------------------------------------------
|
||
# Это программа для копирования конфигураций на коммутаторы
|
||
# ------------------------------------------------------------
|
||
|
||
|
||
# import argparse Использовался для получения аргументов из командной строки (не используется)
|
||
# import platform Использовался для получения списка сетевых адаптеров (не используется)
|
||
# import subprocess Использовался для получения списка сетевых адаптеров (не используется)
|
||
# import socket не используется
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import sys
|
||
import threading
|
||
import time
|
||
from getpass import getpass
|
||
from logging.handlers import RotatingFileHandler
|
||
import tkinter as tk
|
||
from tkinter import (
|
||
StringVar,
|
||
END,
|
||
BOTH,
|
||
LEFT,
|
||
X,
|
||
W,
|
||
filedialog,
|
||
messagebox,
|
||
simpledialog,
|
||
)
|
||
from tkinter import ttk
|
||
|
||
import serial
|
||
import serial.tools.list_ports
|
||
from serial.serialutil import SerialException
|
||
from about_window import AboutWindow
|
||
from TFTPServer import TFTPServer
|
||
# from TFTPServer import TFTPServerThread
|
||
|
||
# Создаем необходимые папки
|
||
os.makedirs("Logs", exist_ok=True)
|
||
os.makedirs("Configs", exist_ok=True)
|
||
os.makedirs("Settings", exist_ok=True)
|
||
os.makedirs("Firmware", exist_ok=True)
|
||
|
||
# Файл настроек находится в папке Settings
|
||
SETTINGS_FILE = os.path.join("Settings", "settings.json")
|
||
|
||
# ==========================
|
||
# Функции работы с настройками и логированием
|
||
# ==========================
|
||
|
||
def setup_logging():
|
||
"""Настройка логирования с использованием RotatingFileHandler."""
|
||
logger = logging.getLogger()
|
||
logger.setLevel(logging.DEBUG)
|
||
log_path = os.path.join("Logs", "app.log")
|
||
handler = RotatingFileHandler(
|
||
log_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8"
|
||
)
|
||
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
|
||
handler.setFormatter(formatter)
|
||
logger.addHandler(handler)
|
||
|
||
def settings_load():
|
||
"""Загрузка настроек из JSON-файла или создание настроек по умолчанию."""
|
||
default_settings = {
|
||
"port": None, # Порт для подключения
|
||
"baudrate": 9600, # Скорость передачи данных
|
||
"config_file": None, # Файл конфигурации
|
||
"login": None, # Логин для подключения
|
||
"password": None, # Пароль для подключения
|
||
"timeout": 10, # Таймаут подключения
|
||
"copy_mode": "line", # Режим копирования
|
||
"block_size": 15, # Размер блока команд
|
||
"prompt": ">", # Используется для определения приглашения
|
||
}
|
||
|
||
# Создаем папку Settings, если её нет
|
||
os.makedirs("Settings", exist_ok=True)
|
||
|
||
if not os.path.exists(SETTINGS_FILE):
|
||
try:
|
||
# При первом запуске создаем новый файл настроек
|
||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(default_settings, f, indent=4, ensure_ascii=False)
|
||
logging.info("Файл настроек создан с настройками по умолчанию.")
|
||
return default_settings
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при создании файла настроек: {e}", exc_info=True)
|
||
return default_settings
|
||
|
||
try:
|
||
# Пытаемся загрузить существующие настройки
|
||
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
|
||
settings = json.load(f)
|
||
|
||
# Проверяем наличие всех необходимых параметров
|
||
settings_changed = False
|
||
for key, value in default_settings.items():
|
||
if key not in settings:
|
||
settings[key] = value
|
||
settings_changed = True
|
||
|
||
# Если были добавлены новые параметры, сохраняем обновленные настройки
|
||
if settings_changed:
|
||
try:
|
||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(settings, f, indent=4, ensure_ascii=False)
|
||
logging.info("Файл настроек обновлен с новыми параметрами.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при обновлении файла настроек: {e}", exc_info=True)
|
||
|
||
return settings
|
||
except Exception as e:
|
||
logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True)
|
||
return default_settings
|
||
|
||
def settings_save(settings):
|
||
"""Сохранение настроек в JSON-файл."""
|
||
try:
|
||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(settings, f, indent=4, ensure_ascii=False)
|
||
logging.info("Настройки сохранены в файл.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при сохранении настроек: {e}", exc_info=True)
|
||
|
||
def list_serial_ports():
|
||
"""Получение списка доступных последовательных портов."""
|
||
ports = serial.tools.list_ports.comports()
|
||
logging.debug(f"Найдено {len(ports)} серийных портов.")
|
||
return [port.device for port in ports]
|
||
|
||
# ==========================
|
||
# Функции работы с сетевыми адаптерами (не используются)
|
||
# ==========================
|
||
|
||
# def list_network_adapters():
|
||
# """Возвращает список названий сетевых адаптеров (Windows)."""
|
||
# adapters = []
|
||
# if platform.system() == "Windows":
|
||
# try:
|
||
# output = subprocess.check_output(
|
||
# 'wmic nic get NetConnectionID',
|
||
# shell=True,
|
||
# encoding="cp866"
|
||
# )
|
||
# for line in output.splitlines():
|
||
# line = line.strip()
|
||
# if line and line != "NetConnectionID":
|
||
# adapters.append(line)
|
||
# except Exception as e:
|
||
# logging.error(f"Ошибка при получении списка адаптеров: {e}", exc_info=True)
|
||
# else:
|
||
# adapters = ["eth0"]
|
||
# return adapters
|
||
|
||
# ==========================
|
||
# Функции работы с COM-соединением
|
||
# ==========================
|
||
|
||
def create_connection(settings):
|
||
"""Создание соединения с устройством через последовательный порт."""
|
||
try:
|
||
conn = serial.Serial(settings["port"], settings["baudrate"], timeout=1)
|
||
logging.info(f"Соединение установлено с {settings['port']} на {settings['baudrate']}.")
|
||
time.sleep(1)
|
||
return conn
|
||
except SerialException as e:
|
||
logging.error(f"Ошибка подключения: {e}", exc_info=True)
|
||
except Exception as e:
|
||
logging.error(f"Неизвестная ошибка подключения: {e}", exc_info=True)
|
||
return None
|
||
|
||
|
||
|
||
# Проверка наличия логина и пароля в настройках и отправка их на устройство
|
||
|
||
def send_login_password(serial_connection, login=None, password=None, is_gui=False):
|
||
"""Отправка логина и пароля на устройство."""
|
||
if not login:
|
||
if is_gui:
|
||
login = simpledialog.askstring("Login", "Введите логин:")
|
||
if login is None:
|
||
login = ""
|
||
else:
|
||
login = input("Введите логин: ")
|
||
|
||
if not password:
|
||
if is_gui:
|
||
password = simpledialog.askstring("Password", "Введите пароль:", show="*")
|
||
if password is None:
|
||
password = ""
|
||
else:
|
||
password = getpass("Введите пароль: ")
|
||
|
||
|
||
# Чтение ответа от устройства с учётом таймаута.
|
||
|
||
def read_response(serial_connection, timeout, login=None, password=None, is_gui=False):
|
||
"""
|
||
Чтение ответа от устройства с учётом таймаута.
|
||
Если в последней строке появляется запрос логина или пароля, данные отправляются автоматически.
|
||
"""
|
||
response = b""
|
||
end_time = time.time() + timeout
|
||
decoded = ""
|
||
while time.time() < end_time:
|
||
if serial_connection.in_waiting:
|
||
chunk = serial_connection.read(serial_connection.in_waiting)
|
||
response += chunk
|
||
|
||
if b"--More--" in response:
|
||
serial_connection.write(b"\n")
|
||
response = response.replace(b"--More--", b"")
|
||
|
||
try:
|
||
decoded = response.decode(errors="ignore")
|
||
except Exception:
|
||
decoded = ""
|
||
lines = decoded.rstrip().splitlines()
|
||
if lines:
|
||
last_line = lines[-1].strip()
|
||
|
||
if re.search(r'(login:|username:)$', last_line, re.IGNORECASE):
|
||
send_login_password(serial_connection, login, None, is_gui)
|
||
response = b""
|
||
continue
|
||
|
||
if re.search(r'(password:)$', last_line, re.IGNORECASE):
|
||
send_login_password(serial_connection, None, password, is_gui)
|
||
response = b""
|
||
continue
|
||
|
||
if last_line.endswith(">") or last_line.endswith("#"):
|
||
break
|
||
else:
|
||
time.sleep(0.1)
|
||
return decoded
|
||
|
||
def generate_command_blocks(lines, block_size):
|
||
"""Генерация блоков команд для блочного копирования."""
|
||
blocks = []
|
||
current_block = []
|
||
for line in lines:
|
||
trimmed = line.strip()
|
||
if not trimmed:
|
||
continue
|
||
lower_line = trimmed.lower()
|
||
if lower_line.startswith("vlan") or lower_line.startswith("enable") or lower_line.startswith("interface"):
|
||
if current_block:
|
||
blocks.append("\n".join(current_block))
|
||
current_block = []
|
||
blocks.append(trimmed)
|
||
elif lower_line.startswith("exit"):
|
||
current_block.append(trimmed)
|
||
blocks.append("\n".join(current_block))
|
||
current_block = []
|
||
else:
|
||
current_block.append(trimmed)
|
||
if len(current_block) >= block_size:
|
||
blocks.append("\n".join(current_block))
|
||
current_block = []
|
||
if current_block:
|
||
blocks.append("\n".join(current_block))
|
||
return blocks
|
||
|
||
def execute_commands_from_file(
|
||
serial_connection,
|
||
filename,
|
||
timeout,
|
||
copy_mode,
|
||
block_size,
|
||
log_callback=None,
|
||
login=None,
|
||
password=None,
|
||
is_gui=False,
|
||
):
|
||
"""
|
||
Выполнение команд из файла конфигурации.
|
||
Если передан log_callback, вывод будет отображаться в GUI.
|
||
Теперь при обнаружении ошибки (например, если в ответе присутствует символ '^')
|
||
команда будет отправляться повторно.
|
||
"""
|
||
try:
|
||
with open(filename, "r", encoding="utf-8") as file:
|
||
lines = [line for line in file if line.strip()]
|
||
msg = f"Выполнение команд из файла: {filename}\n"
|
||
logging.info(msg)
|
||
if log_callback:
|
||
log_callback(msg)
|
||
# Если выбран построчный режим
|
||
if copy_mode == "line":
|
||
for cmd in lines:
|
||
cmd = cmd.strip()
|
||
max_attempts = 3
|
||
attempt = 0
|
||
while attempt < max_attempts:
|
||
msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
serial_connection.write((cmd + "\n").encode())
|
||
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
|
||
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||
if response:
|
||
if '^' in response:
|
||
msg = (
|
||
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
|
||
f"Ответ устройства:\n{response}\n"
|
||
f"Повторная отправка команды...\n"
|
||
)
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
|
||
attempt += 1
|
||
time.sleep(1)
|
||
continue
|
||
else:
|
||
msg = f"Ответ устройства:\n{response}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.info(f"Ответ устройства:\n{response}")
|
||
break
|
||
else:
|
||
msg = f"Ответ не получен для команды: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning(f"Нет ответа для команды: {cmd}")
|
||
break
|
||
if attempt == max_attempts:
|
||
msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.error(msg)
|
||
time.sleep(1)
|
||
# Если выбран блочный режим
|
||
elif copy_mode == "block":
|
||
blocks = generate_command_blocks(lines, block_size)
|
||
for block in blocks:
|
||
msg = f"\nОтправка блока команд:\n{block}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
serial_connection.write((block + "\n").encode())
|
||
logging.info(f"Отправлен блок команд:\n{block}")
|
||
response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||
# Если обнаружена ошибка в ответе на блок, отправляем команды по очереди
|
||
if response and '^' in response:
|
||
msg = (
|
||
f"[WARNING] Обнаружена ошибка при выполнении блока команд.\n"
|
||
f"Ответ устройства:\n{response}\n"
|
||
f"Пересылаются команды по отдельности...\n"
|
||
)
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning("Ошибка в блочном режиме – отправляем команды индивидуально.")
|
||
for line in block.splitlines():
|
||
cmd = line.strip()
|
||
if not cmd:
|
||
continue
|
||
max_attempts = 3
|
||
attempt = 0
|
||
while attempt < max_attempts:
|
||
sub_msg = f"\nОтправка команды: {cmd} (Попытка {attempt+1} из {max_attempts})\n"
|
||
if log_callback:
|
||
log_callback(sub_msg)
|
||
serial_connection.write((cmd + "\n").encode())
|
||
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
|
||
sub_response = read_response(serial_connection, timeout, login=login, password=password, is_gui=is_gui)
|
||
if sub_response:
|
||
if '^' in sub_response:
|
||
sub_msg = (
|
||
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
|
||
f"Ответ устройства:\n{sub_response}\n"
|
||
f"Повторная отправка команды...\n"
|
||
)
|
||
if log_callback:
|
||
log_callback(sub_msg)
|
||
logging.warning(f"Ошибка в команде: {cmd}. Повторная отправка.")
|
||
attempt += 1
|
||
time.sleep(1)
|
||
continue
|
||
else:
|
||
sub_msg = f"Ответ устройства:\n{sub_response}\n"
|
||
if log_callback:
|
||
log_callback(sub_msg)
|
||
logging.info(f"Ответ устройства:\n{sub_response}")
|
||
break
|
||
else:
|
||
sub_msg = f"Ответ не получен для команды: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(sub_msg)
|
||
logging.warning(f"Нет ответа для команды: {cmd}")
|
||
break
|
||
if attempt == max_attempts:
|
||
sub_msg = f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n"
|
||
if log_callback:
|
||
log_callback(sub_msg)
|
||
logging.error(sub_msg)
|
||
time.sleep(1)
|
||
else:
|
||
if response:
|
||
msg = f"Ответ устройства:\n{response}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.info(f"Ответ устройства:\n{response}")
|
||
else:
|
||
msg = f"Ответ не получен для блока:\n{block}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.warning(f"Нет ответа для блока:\n{block}")
|
||
time.sleep(1)
|
||
except SerialException as e:
|
||
msg = f"Ошибка при выполнении команды: {e}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.error(f"Ошибка при выполнении команды: {e}", exc_info=True)
|
||
except Exception as e:
|
||
msg = f"Ошибка при выполнении команд: {e}\n"
|
||
if log_callback:
|
||
log_callback(msg)
|
||
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
|
||
|
||
# ==========================
|
||
# Графический интерфейс (Tkinter)
|
||
# ==========================
|
||
|
||
class SettingsWindow(tk.Toplevel):
|
||
def __init__(self, parent, settings, callback=None):
|
||
super().__init__(parent)
|
||
self.title("Настройки")
|
||
self.geometry("600x400")
|
||
self.settings = settings
|
||
self.callback = callback
|
||
self.resizable(False, False)
|
||
|
||
# Создаем фрейм для настроек
|
||
settings_frame = ttk.Frame(self, padding="10")
|
||
settings_frame.pack(fill=BOTH, expand=True)
|
||
|
||
# COM порт
|
||
ttk.Label(settings_frame, text="COM порт:").grid(row=0, column=0, sticky=W, pady=5)
|
||
self.port_var = StringVar(value=settings.get("port", ""))
|
||
self.port_combo = ttk.Combobox(settings_frame, textvariable=self.port_var)
|
||
self.port_combo.grid(row=0, column=1, sticky=W, pady=5)
|
||
ttk.Button(settings_frame, text="Обновить", command=self.update_ports).grid(row=0, column=2, padx=5)
|
||
|
||
# Скорость передачи
|
||
ttk.Label(settings_frame, text="Скорость:").grid(row=1, column=0, sticky=W, pady=5)
|
||
self.baudrate_var = StringVar(value=str(settings.get("baudrate", 9600)))
|
||
baudrate_combo = ttk.Combobox(settings_frame, textvariable=self.baudrate_var,
|
||
values=["9600", "19200", "38400", "57600", "115200"])
|
||
baudrate_combo.grid(row=1, column=1, sticky=W, pady=5)
|
||
|
||
# Таймаут
|
||
ttk.Label(settings_frame, text="Таймаут (сек):").grid(row=2, column=0, sticky=W, pady=5)
|
||
self.timeout_var = StringVar(value=str(settings.get("timeout", 10)))
|
||
timeout_entry = ttk.Entry(settings_frame, textvariable=self.timeout_var)
|
||
timeout_entry.grid(row=2, column=1, sticky=W, pady=5)
|
||
|
||
# Режим копирования
|
||
ttk.Label(settings_frame, text="Режим копирования:").grid(row=3, column=0, sticky=W, pady=5)
|
||
self.copy_mode_var = StringVar(value=settings.get("copy_mode", "line"))
|
||
copy_mode_frame = ttk.Frame(settings_frame)
|
||
copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5)
|
||
ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line",
|
||
variable=self.copy_mode_var).pack(side=LEFT)
|
||
ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block",
|
||
variable=self.copy_mode_var).pack(side=LEFT)
|
||
|
||
# Размер блока
|
||
ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5)
|
||
self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
|
||
block_size_entry = ttk.Entry(settings_frame, textvariable=self.block_size_var)
|
||
block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
|
||
|
||
# Приглашение командной строки
|
||
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
|
||
self.prompt_var = StringVar(value=settings.get("prompt", ">"))
|
||
prompt_entry = ttk.Entry(settings_frame, textvariable=self.prompt_var)
|
||
prompt_entry.grid(row=5, column=1, sticky=W, pady=5)
|
||
|
||
# Кнопки
|
||
button_frame = ttk.Frame(settings_frame)
|
||
button_frame.grid(row=6, column=0, columnspan=3, pady=20)
|
||
ttk.Button(button_frame, text="Сохранить", command=self.save_settings).pack(side=LEFT, padx=5)
|
||
ttk.Button(button_frame, text="Отмена", command=self.destroy).pack(side=LEFT, padx=5)
|
||
|
||
self.update_ports()
|
||
|
||
# Центрируем окно
|
||
self.center_window()
|
||
|
||
def center_window(self):
|
||
self.update_idletasks()
|
||
width = self.winfo_width()
|
||
height = self.winfo_height()
|
||
x = (self.winfo_screenwidth() // 2) - (width // 2)
|
||
y = (self.winfo_screenheight() // 2) - (height // 2)
|
||
self.geometry(f"{width}x{height}+{x}+{y}")
|
||
|
||
def update_ports(self):
|
||
ports = list_serial_ports()
|
||
self.port_combo["values"] = ports
|
||
if ports and not self.port_var.get():
|
||
self.port_var.set(ports[0])
|
||
|
||
def save_settings(self):
|
||
try:
|
||
self.settings.update({
|
||
"port": self.port_var.get(),
|
||
"baudrate": int(self.baudrate_var.get()),
|
||
"timeout": int(self.timeout_var.get()),
|
||
"copy_mode": self.copy_mode_var.get(),
|
||
"block_size": int(self.block_size_var.get()),
|
||
"prompt": self.prompt_var.get()
|
||
})
|
||
settings_save(self.settings)
|
||
if self.callback:
|
||
self.callback()
|
||
self.destroy()
|
||
messagebox.showinfo("Успех", "Настройки успешно сохранены")
|
||
except ValueError as e:
|
||
messagebox.showerror("Ошибка", "Проверьте правильность введенных значений")
|
||
|
||
class SerialAppGUI(tk.Tk):
|
||
def __init__(self, settings):
|
||
super().__init__()
|
||
self.title("Serial Device Manager")
|
||
self.geometry("900x700")
|
||
self.style = ttk.Style(self)
|
||
self.style.theme_use("clam")
|
||
default_font = ("Segoe UI", 10)
|
||
self.option_add("*Font", default_font)
|
||
self.settings = settings
|
||
self.connection = None
|
||
self.tftp_server = None
|
||
|
||
# Глобальные биндинги
|
||
self.bind_class("Text", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
|
||
self.bind_class("Text", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
|
||
self.bind_class("Text", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
|
||
self.bind_class("Entry", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
|
||
self.bind_class("Entry", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
|
||
self.bind_class("Entry", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
|
||
|
||
self.create_menu()
|
||
self.create_tabs()
|
||
|
||
# Проверка первого запуска
|
||
self.check_first_run()
|
||
|
||
def check_first_run(self):
|
||
"""Проверка первого запуска программы"""
|
||
show_settings = False
|
||
|
||
# Проверяем существование файла настроек
|
||
if not os.path.exists(SETTINGS_FILE):
|
||
show_settings = True
|
||
else:
|
||
# Проверяем содержимое файла настроек
|
||
try:
|
||
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
|
||
settings = json.load(f)
|
||
# Если порт не настроен, считаем это первым запуском
|
||
if settings.get("port") is None:
|
||
show_settings = True
|
||
except Exception:
|
||
# Если файл поврежден или не читается, тоже показываем настройки
|
||
show_settings = True
|
||
|
||
if show_settings:
|
||
# Создаем папку Settings, если её нет
|
||
os.makedirs("Settings", exist_ok=True)
|
||
|
||
response = messagebox.askyesno(
|
||
"Первый запуск",
|
||
"Это первый запуск программы. Хотите настроить параметры подключения сейчас?"
|
||
)
|
||
if response:
|
||
self.open_settings()
|
||
|
||
def create_menu(self):
|
||
menubar = tk.Menu(self)
|
||
self.config(menu=menubar)
|
||
|
||
# Меню "Файл"
|
||
file_menu = tk.Menu(menubar, tearoff=0)
|
||
menubar.add_cascade(label="Файл", menu=file_menu)
|
||
file_menu.add_command(label="Настройки", command=self.open_settings)
|
||
file_menu.add_separator()
|
||
file_menu.add_command(label="Выход", command=self.quit)
|
||
|
||
# Меню "Справка"
|
||
help_menu = tk.Menu(menubar, tearoff=0)
|
||
menubar.add_cascade(label="Справка", menu=help_menu)
|
||
help_menu.add_command(label="О программе", command=self.open_about)
|
||
|
||
def open_settings(self):
|
||
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
|
||
settings_window.transient(self)
|
||
settings_window.grab_set()
|
||
|
||
def on_settings_changed(self):
|
||
# Обновляем настройки в основном приложении
|
||
self.settings = settings_load()
|
||
|
||
def create_tabs(self):
|
||
self.notebook = ttk.Notebook(self)
|
||
self.notebook.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Создаем вкладки
|
||
interactive_frame = ttk.Frame(self.notebook)
|
||
file_exec_frame = ttk.Frame(self.notebook)
|
||
config_editor_frame = ttk.Frame(self.notebook)
|
||
tftp_frame = ttk.Frame(self.notebook)
|
||
|
||
self.notebook.add(interactive_frame, text="Интерактивный режим")
|
||
self.notebook.add(file_exec_frame, text="Выполнение файла")
|
||
self.notebook.add(config_editor_frame, text="Редактор конфигурации")
|
||
self.notebook.add(tftp_frame, text="TFTP Сервер")
|
||
|
||
self.create_interactive_tab(interactive_frame)
|
||
self.create_file_exec_tab(file_exec_frame)
|
||
self.create_config_editor_tab(config_editor_frame)
|
||
self.create_tftp_tab(tftp_frame)
|
||
|
||
# -------------- Вкладка "Интерактивный режим" --------------
|
||
def create_interactive_tab(self, frame):
|
||
control_frame = ttk.Frame(frame)
|
||
control_frame.pack(fill=X, pady=5)
|
||
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
|
||
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
|
||
|
||
self.interactive_text = tk.Text(frame, wrap="word", height=20)
|
||
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
input_frame = ttk.Frame(frame)
|
||
input_frame.pack(fill=X, pady=5)
|
||
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
|
||
self.command_entry = ttk.Entry(input_frame, width=50)
|
||
self.command_entry.pack(side=LEFT, padx=5)
|
||
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
|
||
|
||
def connect_device(self):
|
||
if self.connection:
|
||
messagebox.showinfo("Информация", "Уже подключено.")
|
||
return
|
||
if not self.settings.get("port"):
|
||
messagebox.showerror("Ошибка", "COM-порт не выбран!")
|
||
return
|
||
self.connection = create_connection(self.settings)
|
||
if self.connection:
|
||
self.append_interactive_text("[INFO] Подключение установлено.\n")
|
||
else:
|
||
self.append_interactive_text("[ERROR] Не удалось установить соединение.\n")
|
||
|
||
def disconnect_device(self):
|
||
if self.connection:
|
||
try:
|
||
self.connection.close()
|
||
except Exception:
|
||
pass
|
||
self.connection = None
|
||
self.append_interactive_text("[INFO] Соединение закрыто.\n")
|
||
else:
|
||
messagebox.showinfo("Информация", "Соединение не установлено.")
|
||
|
||
def send_command(self):
|
||
if not self.connection:
|
||
messagebox.showerror("Ошибка", "Сначала установите соединение!")
|
||
return
|
||
cmd = self.command_entry.get().strip()
|
||
if not cmd:
|
||
return
|
||
self.append_interactive_text(f"[INFO] Отправка команды: {cmd}\n")
|
||
threading.Thread(target=self.process_command, args=(cmd,), daemon=True).start()
|
||
|
||
def process_command(self, cmd):
|
||
try:
|
||
max_attempts = 3
|
||
attempt = 0
|
||
while attempt < max_attempts:
|
||
self.connection.write((cmd + "\n").encode())
|
||
logging.info(f"Отправлена команда: {cmd} (Попытка {attempt+1})")
|
||
response = read_response(
|
||
self.connection,
|
||
self.settings.get("timeout", 10),
|
||
login=self.settings.get("login"),
|
||
password=self.settings.get("password"),
|
||
is_gui=True,
|
||
)
|
||
if response:
|
||
if '^' in response:
|
||
self.append_interactive_text(
|
||
f"[ERROR] Обнаружена ошибка при выполнении команды: {cmd}\n"
|
||
f"Ответ устройства:\n{response}\n"
|
||
f"Повторная отправка команды...\n"
|
||
)
|
||
logging.warning(f"Ошибка в команде: {cmd}. Попытка повторной отправки.")
|
||
attempt += 1
|
||
time.sleep(1)
|
||
continue
|
||
else:
|
||
self.append_interactive_text(f"[INFO] Ответ устройства:\n{response}\n")
|
||
logging.info(f"Получен ответ:\n{response}")
|
||
break
|
||
else:
|
||
self.append_interactive_text("[WARN] Ответ не получен.\n")
|
||
logging.warning("Нет ответа от устройства в течение таймаута.")
|
||
break
|
||
if attempt == max_attempts:
|
||
self.append_interactive_text(f"[ERROR] Команда не выполнена корректно после {max_attempts} попыток: {cmd}\n")
|
||
logging.error(f"Команда не выполнена корректно после {max_attempts} попыток: {cmd}")
|
||
except SerialException as e:
|
||
self.append_interactive_text(f"[ERROR] Ошибка при отправке команды: {e}\n")
|
||
logging.error(f"Ошибка отправки команды: {e}", exc_info=True)
|
||
except Exception as e:
|
||
self.append_interactive_text(f"[ERROR] Неизвестная ошибка: {e}\n")
|
||
logging.error(f"Неизвестная ошибка: {e}", exc_info=True)
|
||
|
||
def append_interactive_text(self, text):
|
||
self.interactive_text.insert(END, text)
|
||
self.interactive_text.see(END)
|
||
|
||
# -------------- Вкладка "Выполнить команды из файла" --------------
|
||
def create_file_exec_tab(self, frame):
|
||
file_frame = ttk.Frame(frame)
|
||
file_frame.pack(fill=X, pady=5)
|
||
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
|
||
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
|
||
ttk.Entry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
|
||
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
|
||
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
|
||
self.file_exec_text = tk.Text(frame, wrap="word", height=15)
|
||
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
def select_config_file_fileexec(self):
|
||
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
|
||
if filename:
|
||
self.file_exec_var.set(filename)
|
||
|
||
def execute_file_commands(self):
|
||
if not self.settings.get("port"):
|
||
messagebox.showerror("Ошибка", "COM-порт не выбран!")
|
||
return
|
||
if not self.file_exec_var.get():
|
||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран!")
|
||
return
|
||
if not self.connection:
|
||
self.connection = create_connection(self.settings)
|
||
if not self.connection:
|
||
self.append_file_exec_text("[ERROR] Не удалось установить соединение.\n")
|
||
return
|
||
threading.Thread(
|
||
target=execute_commands_from_file,
|
||
args=(
|
||
self.connection,
|
||
self.file_exec_var.get(),
|
||
self.settings.get("timeout", 10),
|
||
self.settings.get("copy_mode", "line"),
|
||
self.settings.get("block_size", 15),
|
||
self.append_file_exec_text,
|
||
self.settings.get("login"),
|
||
self.settings.get("password"),
|
||
True,
|
||
),
|
||
daemon=True,
|
||
).start()
|
||
|
||
def append_file_exec_text(self, text):
|
||
self.file_exec_text.insert(END, text)
|
||
self.file_exec_text.see(END)
|
||
|
||
# -------------- Вкладка "Редактор конфигурационного файла" --------------
|
||
def create_config_editor_tab(self, frame):
|
||
top_frame = ttk.Frame(frame)
|
||
top_frame.pack(fill=X, pady=5)
|
||
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
|
||
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
|
||
ttk.Entry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
|
||
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
|
||
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
|
||
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
|
||
self.config_editor_text = tk.Text(frame, wrap="word")
|
||
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
def select_config_file_editor(self):
|
||
filename = filedialog.askopenfilename(title="Выберите файл конфигурации", filetypes=[("Text files", "*.txt")])
|
||
if filename:
|
||
self.editor_file_var.set(filename)
|
||
self.settings["config_file"] = filename
|
||
settings_save(self.settings)
|
||
|
||
def load_config_file(self):
|
||
filename = self.editor_file_var.get()
|
||
if not filename or not os.path.exists(filename):
|
||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран или не существует.")
|
||
return
|
||
try:
|
||
with open(filename, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
self.config_editor_text.delete("1.0", END)
|
||
self.config_editor_text.insert(END, content)
|
||
messagebox.showinfo("Информация", "Файл загружен.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка загрузки файла: {e}", exc_info=True)
|
||
messagebox.showerror("Ошибка", f"Не удалось загрузить файл:\n{e}")
|
||
|
||
def save_config_file(self):
|
||
filename = self.editor_file_var.get()
|
||
if not filename:
|
||
messagebox.showerror("Ошибка", "Файл конфигурации не выбран.")
|
||
return
|
||
try:
|
||
content = self.config_editor_text.get("1.0", END)
|
||
with open(filename, "w", encoding="utf-8") as f:
|
||
f.write(content)
|
||
messagebox.showinfo("Информация", "Файл сохранён.")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка сохранения файла: {e}", exc_info=True)
|
||
messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}")
|
||
|
||
def open_about(self):
|
||
about_window = AboutWindow(self)
|
||
about_window.transient(self)
|
||
about_window.grab_set()
|
||
|
||
def create_tftp_tab(self, frame):
|
||
"""Создание вкладки TFTP сервера."""
|
||
# Создаем фрейм для управления TFTP сервером
|
||
tftp_frame = ttk.Frame(frame)
|
||
tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Создаем и размещаем элементы управления
|
||
controls_frame = ttk.LabelFrame(tftp_frame, text="Управление TFTP сервером")
|
||
controls_frame.pack(fill=X, padx=5, pady=5)
|
||
|
||
# IP адрес
|
||
ip_frame = ttk.Frame(controls_frame)
|
||
ip_frame.pack(fill=X, padx=5, pady=2)
|
||
ttk.Label(ip_frame, text="IP адрес:").pack(side=LEFT, padx=5)
|
||
self.tftp_ip_var = StringVar(value="0.0.0.0")
|
||
self.tftp_ip_entry = ttk.Entry(ip_frame, textvariable=self.tftp_ip_var)
|
||
self.tftp_ip_entry.pack(fill=X, expand=True, padx=5)
|
||
|
||
# Порт
|
||
port_frame = ttk.Frame(controls_frame)
|
||
port_frame.pack(fill=X, padx=5, pady=2)
|
||
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
|
||
self.tftp_port_var = StringVar(value="69")
|
||
self.tftp_port_entry = ttk.Entry(port_frame, textvariable=self.tftp_port_var)
|
||
self.tftp_port_entry.pack(fill=X, expand=True, padx=5)
|
||
|
||
# Кнопки управления
|
||
buttons_frame = ttk.Frame(controls_frame)
|
||
buttons_frame.pack(fill=X, padx=5, pady=5)
|
||
|
||
self.start_tftp_button = ttk.Button(
|
||
buttons_frame,
|
||
text="Запустить сервер",
|
||
command=self.start_tftp_server
|
||
)
|
||
self.start_tftp_button.pack(side=LEFT, padx=5)
|
||
|
||
self.stop_tftp_button = ttk.Button(
|
||
buttons_frame,
|
||
text="Остановить сервер",
|
||
command=self.stop_tftp_server,
|
||
state="disabled"
|
||
)
|
||
self.stop_tftp_button.pack(side=LEFT, padx=5)
|
||
|
||
# Лог сервера
|
||
log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера")
|
||
log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
self.tftp_log_text = tk.Text(log_frame, wrap=tk.WORD, height=10)
|
||
self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Добавляем скроллбар для лога
|
||
scrollbar = ttk.Scrollbar(self.tftp_log_text, command=self.tftp_log_text.yview)
|
||
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
|
||
self.tftp_log_text.config(yscrollcommand=scrollbar.set)
|
||
|
||
# Статус передач
|
||
transfers_frame = ttk.LabelFrame(tftp_frame, text="Активные передачи")
|
||
transfers_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Создаем таблицу для отображения активных передач
|
||
columns = ("client", "filename", "progress", "remaining", "time")
|
||
self.transfers_tree = ttk.Treeview(transfers_frame, columns=columns, show="headings")
|
||
|
||
# Настраиваем заголовки колонок
|
||
self.transfers_tree.heading("client", text="Клиент")
|
||
self.transfers_tree.heading("filename", text="Файл")
|
||
self.transfers_tree.heading("progress", text="Прогресс")
|
||
self.transfers_tree.heading("remaining", text="Осталось")
|
||
self.transfers_tree.heading("time", text="Время")
|
||
|
||
# Настраиваем ширину колонок
|
||
self.transfers_tree.column("client", width=120)
|
||
self.transfers_tree.column("filename", width=150)
|
||
self.transfers_tree.column("progress", width=100)
|
||
self.transfers_tree.column("remaining", width=100)
|
||
self.transfers_tree.column("time", width=80)
|
||
|
||
self.transfers_tree.pack(fill=BOTH, expand=True, padx=5, pady=5)
|
||
|
||
# Инициализация TFTP сервера
|
||
self.tftp_server = None
|
||
self.tftp_server_thread = None
|
||
|
||
def start_tftp_server(self):
|
||
"""Запуск TFTP сервера."""
|
||
try:
|
||
ip = self.tftp_ip_var.get()
|
||
port = int(self.tftp_port_var.get())
|
||
|
||
# Создаем экземпляр TFTP сервера
|
||
self.tftp_server = TFTPServer("Firmware")
|
||
|
||
# Устанавливаем callback для логирования
|
||
def log_callback(message):
|
||
self.append_tftp_log(message)
|
||
# Обновляем информацию о передачах
|
||
self.update_transfers_info()
|
||
|
||
self.tftp_server.set_log_callback(log_callback)
|
||
|
||
# Запускаем сервер в отдельном потоке
|
||
self.tftp_server_thread = threading.Thread(
|
||
target=self.run_tftp_server,
|
||
args=(ip, port),
|
||
daemon=True
|
||
)
|
||
self.tftp_server_thread.start()
|
||
|
||
# Обновляем состояние кнопок
|
||
self.start_tftp_button.config(state="disabled")
|
||
self.stop_tftp_button.config(state="normal")
|
||
self.tftp_ip_entry.config(state="disabled")
|
||
self.tftp_port_entry.config(state="disabled")
|
||
|
||
self.append_tftp_log(f"[INFO] TFTP сервер запущен на {ip}:{port}")
|
||
|
||
# Запускаем периодическое обновление информации о передачах
|
||
self.update_transfers_periodically()
|
||
|
||
except Exception as e:
|
||
self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
|
||
messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}")
|
||
|
||
def run_tftp_server(self, ip, port):
|
||
"""Запуск TFTP сервера в отдельном потоке."""
|
||
try:
|
||
self.tftp_server.start_server(ip, port)
|
||
except Exception as e:
|
||
self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}")
|
||
|
||
def stop_tftp_server(self):
|
||
"""Остановка TFTP сервера."""
|
||
if self.tftp_server:
|
||
try:
|
||
# Отключаем кнопки на время остановки сервера
|
||
self.start_tftp_button.config(state="disabled")
|
||
self.stop_tftp_button.config(state="disabled")
|
||
|
||
# Останавливаем сервер
|
||
self.tftp_server.stop_server()
|
||
|
||
# Ждем завершения потока сервера с таймаутом
|
||
if self.tftp_server_thread:
|
||
self.tftp_server_thread.join(timeout=5.0)
|
||
if self.tftp_server_thread.is_alive():
|
||
self.append_tftp_log("[WARN] Превышено время ожидания остановки сервера")
|
||
|
||
# Очищаем ссылки на сервер и поток
|
||
self.tftp_server = None
|
||
self.tftp_server_thread = None
|
||
|
||
# Обновляем состояние кнопок
|
||
self.start_tftp_button.config(state="normal")
|
||
self.stop_tftp_button.config(state="disabled")
|
||
self.tftp_ip_entry.config(state="normal")
|
||
self.tftp_port_entry.config(state="normal")
|
||
|
||
self.append_tftp_log("[INFO] TFTP сервер остановлен")
|
||
|
||
# Очищаем таблицу передач
|
||
for item in self.transfers_tree.get_children():
|
||
self.transfers_tree.delete(item)
|
||
|
||
except Exception as e:
|
||
self.append_tftp_log(f"[ERROR] Ошибка остановки сервера: {str(e)}")
|
||
messagebox.showerror("Ошибка", f"Не удалось остановить TFTP сервер: {str(e)}")
|
||
|
||
# Восстанавливаем состояние кнопок в случае ошибки
|
||
self.start_tftp_button.config(state="disabled")
|
||
self.stop_tftp_button.config(state="normal")
|
||
|
||
def append_tftp_log(self, message):
|
||
"""Добавление сообщения в лог TFTP сервера."""
|
||
self.tftp_log_text.insert(END, message + "\n")
|
||
self.tftp_log_text.see(END)
|
||
|
||
def update_transfers_info(self):
|
||
"""Обновление информации об активных передачах."""
|
||
if not self.tftp_server:
|
||
return
|
||
|
||
# Очищаем текущие записи
|
||
for item in self.transfers_tree.get_children():
|
||
self.transfers_tree.delete(item)
|
||
|
||
# Добавляем информацию о текущих передачах
|
||
for client_addr, transfer_info in self.tftp_server.active_transfers.items():
|
||
filename = transfer_info['filename']
|
||
bytes_sent = transfer_info['bytes_sent']
|
||
filesize = transfer_info['filesize']
|
||
start_time = transfer_info['start_time']
|
||
|
||
# Вычисляем прогресс
|
||
progress = f"{bytes_sent}/{filesize} байт"
|
||
remaining = filesize - bytes_sent
|
||
elapsed_time = time.time() - start_time
|
||
|
||
# Добавляем запись в таблицу
|
||
self.transfers_tree.insert("", END, values=(
|
||
f"{client_addr[0]}:{client_addr[1]}",
|
||
filename,
|
||
progress,
|
||
f"{remaining} байт",
|
||
f"{elapsed_time:.1f}с"
|
||
))
|
||
|
||
def update_transfers_periodically(self):
|
||
"""Периодическое обновление информации о передачах."""
|
||
if self.tftp_server and self.tftp_server.running:
|
||
self.update_transfers_info()
|
||
# Планируем следующее обновление через 1 секунду
|
||
self.after(1000, self.update_transfers_periodically)
|
||
|
||
# ==========================
|
||
# Парсер аргументов (не используется)
|
||
# ==========================
|
||
# def parse_arguments():
|
||
# parser = argparse.ArgumentParser(
|
||
# description="Программа для работы с устройствами через последовательный порт с графическим интерфейсом."
|
||
# )
|
||
# parser.add_argument("--cli", action="store_true", help="Запустить в режиме командной строки (без графики)")
|
||
# return parser.parse_args()
|
||
|
||
# ==========================
|
||
# Режим командной строки (не используется)
|
||
# ==========================
|
||
|
||
# def run_cli_mode(settings):
|
||
# print("Запущен режим командной строки.")
|
||
# while True:
|
||
# print("\n--- Главное меню ---")
|
||
# print("1. Подключиться и войти в интерактивный режим")
|
||
# print("2. Выполнить команды из файла конфигурации")
|
||
# print("0. Выход")
|
||
# choice = input("Выберите действие: ").strip()
|
||
# if choice == "1":
|
||
# if not settings.get("port"):
|
||
# print("[ERROR] COM-порт не выбран!")
|
||
# continue
|
||
# conn = create_connection(settings)
|
||
# if not conn:
|
||
# print("[ERROR] Не удалось установить соединение.")
|
||
# continue
|
||
# print("[INFO] Введите команды (для выхода введите _exit):")
|
||
# while True:
|
||
# cmd = input("Команда: ")
|
||
# if cmd.strip().lower() == "_exit":
|
||
# break
|
||
# try:
|
||
# conn.write((cmd + "\n").encode())
|
||
# response = read_response(
|
||
# conn, settings.get("timeout", 10),
|
||
# login=settings.get("login"),
|
||
# password=settings.get("password"),
|
||
# is_gui=False
|
||
# )
|
||
# print(response)
|
||
# except Exception as e:
|
||
# print(f"[ERROR] {e}")
|
||
# break
|
||
# conn.close()
|
||
# elif choice == "2":
|
||
# if not settings.get("config_file"):
|
||
# print("[ERROR] Файл конфигурации не выбран!")
|
||
# continue
|
||
# if not settings.get("port"):
|
||
# print("[ERROR] COM-порт не выбран!")
|
||
# continue
|
||
# conn = create_connection(settings)
|
||
# if conn:
|
||
# execute_commands_from_file(
|
||
# conn,
|
||
# settings["config_file"],
|
||
# settings.get("timeout", 10),
|
||
# settings.get("copy_mode", "line"),
|
||
# settings.get("block_size", 15),
|
||
# lambda msg: print(msg),
|
||
# login=settings.get("login"),
|
||
# password=settings.get("password"),
|
||
# is_gui=False,
|
||
# )
|
||
# conn.close()
|
||
# else:
|
||
# print("[ERROR] Не удалось установить соединение.")
|
||
# elif choice == "0":
|
||
# break
|
||
# else:
|
||
# print("[ERROR] Некорректный выбор.")
|
||
|
||
# ==========================
|
||
# Основной запуск приложения
|
||
# ==========================
|
||
def main():
|
||
setup_logging()
|
||
settings = settings_load()
|
||
app = SerialAppGUI(settings)
|
||
app.mainloop()
|
||
|
||
# ==========================
|
||
# Основной запуск приложения
|
||
# ==========================
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except KeyboardInterrupt:
|
||
logging.info("Программа прервана пользователем (KeyboardInterrupt).")
|
||
sys.exit(0)
|
||
except Exception as e:
|
||
logging.critical(f"Неизвестная ошибка: {e}", exc_info=True)
|
||
sys.exit(1)
|