The application has been refactored. Functions have been moved to separate libraries. Many different functions have been added. Performance is still poor

This commit is contained in:
2025-02-16 21:57:01 +03:00
parent a140b7d8a0
commit cb5329ddb7
46 changed files with 5316 additions and 0 deletions

View File

View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
from typing import List, Optional, Dict, Any
import logging
from core.exceptions import ValidationError
from .serial_manager import SerialManager
class CommandHandler:
"""Обработчик команд для работы с устройством."""
def __init__(self, serial_manager: SerialManager):
self._serial = serial_manager
self._logger = logging.getLogger(__name__)
self._command_cache: Dict[str, str] = {}
def validate_command(self, command: str) -> None:
"""
Валидация команды перед отправкой.
Args:
command: Команда для проверки
Raises:
ValidationError: Если команда не прошла валидацию
"""
# Проверяем на пустую команду
if not command or not command.strip():
raise ValidationError("Пустая команда")
# Проверяем на недопустимые символы
if re.search(r'[\x00-\x1F\x7F]', command):
raise ValidationError("Команда содержит недопустимые символы")
# Проверяем максимальную длину
if len(command) > 1024:
raise ValidationError("Превышена максимальная длина команды (1024 символа)")
def validate_config_commands(self, commands: List[str]) -> None:
"""
Валидация списка команд конфигурации.
Args:
commands: Список команд для проверки
Raises:
ValidationError: Если команды не прошли валидацию
"""
if not commands:
raise ValidationError("Пустой список команд")
for command in commands:
self.validate_command(command)
def execute_command(self, command: str, timeout: Optional[int] = None,
use_cache: bool = False) -> str:
"""
Выполнение команды на устройстве.
Args:
command: Команда для выполнения
timeout: Таймаут ожидания ответа
use_cache: Использовать кэш команд
Returns:
str: Ответ устройства
Raises:
ValidationError: При ошибке валидации
ConnectionError: При ошибке соединения
"""
# Проверяем команду
self.validate_command(command)
# Проверяем кэш
if use_cache and command in self._command_cache:
self._logger.debug(f"Использован кэш для команды: {command}")
return self._command_cache[command]
# Выполняем команду
response = self._serial.send_command(command, timeout)
# Сохраняем в кэш
if use_cache:
self._command_cache[command] = response
return response
def execute_config(self, commands: List[str], timeout: Optional[int] = None) -> str:
"""
Выполнение конфигурационных команд на устройстве.
Args:
commands: Список команд для выполнения
timeout: Таймаут ожидания ответа
Returns:
str: Ответ устройства
Raises:
ValidationError: При ошибке валидации
ConnectionError: При ошибке соединения
"""
# Проверяем команды
self.validate_config_commands(commands)
# Выполняем команды
return self._serial.send_config(commands, timeout)
def clear_cache(self) -> None:
"""Очистка кэша команд."""
self._command_cache.clear()
self._logger.debug("Кэш команд очищен")
def get_device_info(self) -> Dict[str, str]:
"""
Получение информации об устройстве.
Returns:
Dict[str, str]: Словарь с информацией об устройстве
Raises:
ConnectionError: При ошибке соединения
"""
info = {}
try:
# Получаем версию ПО
version = self.execute_command("show version", use_cache=True)
info["version"] = self._parse_version(version)
# Получаем hostname
hostname = self.execute_command("show hostname", use_cache=True)
info["hostname"] = hostname.strip()
# Получаем модель
model = self.execute_command("show model", use_cache=True)
info["model"] = self._parse_model(model)
except Exception as e:
self._logger.error(f"Ошибка получения информации об устройстве: {e}")
raise
return info
def _parse_version(self, version_output: str) -> str:
"""
Парсинг версии из вывода команды.
Args:
version_output: Вывод команды show version
Returns:
str: Версия ПО
"""
# Здесь должна быть реализация парсинга версии
# в зависимости от формата вывода конкретного устройства
return version_output.strip()
def _parse_model(self, model_output: str) -> str:
"""
Парсинг модели из вывода команды.
Args:
model_output: Вывод команды show model
Returns:
str: Модель устройства
"""
# Здесь должна быть реализация парсинга модели
# в зависимости от формата вывода конкретного устройства
return model_output.strip()

View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from core.events import event_bus, Event, EventTypes
from core.exceptions import ConnectionError, AuthenticationError
class BaseProtocol(ABC):
"""Базовый класс для всех протоколов связи."""
def __init__(self):
self._connected = False
self._authenticated = False
self._config: Dict[str, Any] = {}
@property
def is_connected(self) -> bool:
"""Проверка состояния подключения."""
return self._connected
@property
def is_authenticated(self) -> bool:
"""Проверка состояния аутентификации."""
return self._authenticated
@abstractmethod
def connect(self) -> None:
"""
Установка соединения.
Raises:
ConnectionError: При ошибке подключения
"""
pass
@abstractmethod
def disconnect(self) -> None:
"""Разрыв соединения."""
pass
@abstractmethod
def authenticate(self, username: str, password: str) -> None:
"""
Аутентификация на устройстве.
Args:
username: Имя пользователя
password: Пароль
Raises:
AuthenticationError: При ошибке аутентификации
"""
pass
@abstractmethod
def send_command(self, command: str, timeout: Optional[int] = None) -> str:
"""
Отправка команды на устройство.
Args:
command: Команда для отправки
timeout: Таймаут ожидания ответа
Returns:
str: Ответ устройства
Raises:
ConnectionError: При ошибке отправки команды
"""
pass
@abstractmethod
def send_config(self, config_commands: list[str], timeout: Optional[int] = None) -> str:
"""
Отправка конфигурационных команд на устройство.
Args:
config_commands: Список команд конфигурации
timeout: Таймаут ожидания ответа
Returns:
str: Ответ устройства
Raises:
ConnectionError: При ошибке отправки команд
"""
pass
def configure(self, **kwargs) -> None:
"""
Конфигурация протокола.
Args:
**kwargs: Параметры конфигурации
"""
self._config.update(kwargs)
def _notify_connection_established(self) -> None:
"""Уведомление об установке соединения."""
self._connected = True
event_bus.publish(Event(EventTypes.CONNECTION_ESTABLISHED, None))
def _notify_connection_lost(self) -> None:
"""Уведомление о потере соединения."""
self._connected = False
self._authenticated = False
event_bus.publish(Event(EventTypes.CONNECTION_LOST, None))
def _notify_connection_error(self, error: Exception) -> None:
"""
Уведомление об ошибке соединения.
Args:
error: Объект ошибки
"""
self._connected = False
self._authenticated = False
event_bus.publish(Event(EventTypes.CONNECTION_ERROR, str(error)))

View File

@@ -0,0 +1,224 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import re
from typing import Optional, List
import serial
from serial.serialutil import SerialException
from core.config import AppConfig
from core.exceptions import ConnectionError, AuthenticationError
from .base import BaseProtocol
class SerialProtocol(BaseProtocol):
"""Протокол для работы с последовательным портом."""
def __init__(self):
super().__init__()
self._serial: Optional[serial.Serial] = None
self._prompt = AppConfig.DEFAULT_PROMPT
self._timeout = AppConfig.DEFAULT_TIMEOUT
self._baudrate = AppConfig.DEFAULT_BAUDRATE
self._port: Optional[str] = None
def configure(self, port: str, baudrate: int = None, timeout: int = None,
prompt: str = None) -> None:
"""
Конфигурация последовательного порта.
Args:
port: COM-порт
baudrate: Скорость передачи
timeout: Таймаут операций
prompt: Приглашение командной строки
"""
self._port = port
if baudrate is not None:
self._baudrate = baudrate
if timeout is not None:
self._timeout = timeout
if prompt is not None:
self._prompt = prompt
def connect(self) -> None:
"""
Установка соединения с устройством.
Raises:
ConnectionError: При ошибке подключения
"""
if not self._port:
raise ConnectionError("Не указан COM-порт")
try:
self._serial = serial.Serial(
port=self._port,
baudrate=self._baudrate,
timeout=1
)
time.sleep(1) # Ждем инициализации порта
self._notify_connection_established()
except SerialException as e:
self._notify_connection_error(e)
raise ConnectionError(f"Ошибка подключения к порту {self._port}: {e}")
def disconnect(self) -> None:
"""Закрытие соединения."""
if self._serial and self._serial.is_open:
self._serial.close()
self._notify_connection_lost()
def authenticate(self, username: str, password: str) -> None:
"""
Аутентификация на устройстве.
Args:
username: Имя пользователя
password: Пароль
Raises:
AuthenticationError: При ошибке аутентификации
ConnectionError: При ошибке соединения
"""
if not self.is_connected:
raise ConnectionError("Нет подключения к устройству")
try:
# Очищаем буфер
self._serial.reset_input_buffer()
self._serial.reset_output_buffer()
# Отправляем Enter для получения приглашения
self._serial.write(b"\n")
time.sleep(0.5)
# Ожидаем запрос логина или пароля
response = self._read_until(["login:", "username:", "password:", self._prompt])
if "login:" in response.lower() or "username:" in response.lower():
self._serial.write(f"{username}\n".encode())
time.sleep(0.5)
response = self._read_until(["password:", self._prompt])
if "password:" in response.lower():
self._serial.write(f"{password}\n".encode())
time.sleep(0.5)
response = self._read_until([self._prompt])
if self._prompt not in response:
raise AuthenticationError("Неверные учетные данные")
self._authenticated = True
except SerialException as e:
self._notify_connection_error(e)
raise ConnectionError(f"Ошибка при аутентификации: {e}")
def send_command(self, command: str, timeout: Optional[int] = None) -> str:
"""
Отправка команды на устройство.
Args:
command: Команда для отправки
timeout: Таймаут ожидания ответа
Returns:
str: Ответ устройства
Raises:
ConnectionError: При ошибке отправки команды
"""
if not self.is_connected:
raise ConnectionError("Нет подключения к устройству")
if not self.is_authenticated:
raise ConnectionError("Требуется аутентификация")
try:
# Очищаем буфер перед отправкой
self._serial.reset_input_buffer()
# Отправляем команду
self._serial.write(f"{command}\n".encode())
# Ожидаем ответ
response = self._read_until([self._prompt], timeout or self._timeout)
# Удаляем отправленную команду из ответа
lines = response.splitlines()
if lines and command in lines[0]:
lines.pop(0)
return "\n".join(lines).strip()
except SerialException as e:
self._notify_connection_error(e)
raise ConnectionError(f"Ошибка при отправке команды: {e}")
def send_config(self, config_commands: List[str], timeout: Optional[int] = None) -> str:
"""
Отправка конфигурационных команд на устройство.
Args:
config_commands: Список команд конфигурации
timeout: Таймаут ожидания ответа
Returns:
str: Ответ устройства
Raises:
ConnectionError: При ошибке отправки команд
"""
responses = []
for command in config_commands:
response = self.send_command(command, timeout)
responses.append(response)
return "\n".join(responses)
def _read_until(self, patterns: List[str], timeout: Optional[int] = None) -> str:
"""
Чтение данных из порта до появления одного из паттернов.
Args:
patterns: Список паттернов для поиска
timeout: Таймаут операции
Returns:
str: Прочитанные данные
Raises:
ConnectionError: При ошибке чтения или таймауте
"""
if not patterns:
raise ValueError("Не указаны паттерны для поиска")
timeout = timeout or self._timeout
end_time = time.time() + timeout
buffer = ""
while time.time() < end_time:
if self._serial.in_waiting:
chunk = self._serial.read(self._serial.in_waiting).decode(errors="ignore")
buffer += chunk
# Проверяем наличие паттернов
for pattern in patterns:
if pattern in buffer:
return buffer
# Небольшая задержка для снижения нагрузки на CPU
time.sleep(0.1)
raise ConnectionError(f"Таймаут операции ({timeout} сек)")
@staticmethod
def list_ports() -> List[str]:
"""
Получение списка доступных COM-портов.
Returns:
List[str]: Список доступных портов
"""
return [port.device for port in serial.tools.list_ports.comports()]

View File

@@ -0,0 +1,190 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from typing import Optional, Callable
import tftpy
from core.config import AppConfig
from core.exceptions import TFTPError
from core.events import event_bus, Event, EventTypes
class TFTPProtocol:
"""Протокол для работы с TFTP сервером."""
def __init__(self):
self._server: Optional[tftpy.TftpServer] = None
self._client: Optional[tftpy.TftpClient] = None
self._server_running = False
self._root_dir = AppConfig.CONFIGS_DIR
self._port = AppConfig.TFTP_PORT
self._timeout = AppConfig.TFTP_TIMEOUT
self._retries = AppConfig.TFTP_RETRIES
def configure(self, root_dir: Optional[str] = None, port: Optional[int] = None,
timeout: Optional[int] = None, retries: Optional[int] = None) -> None:
"""
Конфигурация TFTP сервера/клиента.
Args:
root_dir: Корневая директория для файлов
port: Порт TFTP сервера
timeout: Таймаут операций
retries: Количество попыток
"""
if root_dir is not None:
self._root_dir = root_dir
if port is not None:
self._port = port
if timeout is not None:
self._timeout = timeout
if retries is not None:
self._retries = retries
def start_server(self, host: str = "0.0.0.0") -> None:
"""
Запуск TFTP сервера.
Args:
host: IP-адрес для прослушивания
Raises:
TFTPError: При ошибке запуска сервера
"""
if self._server_running:
return
try:
# Создаем серверный объект
self._server = tftpy.TftpServer(self._root_dir)
# Запускаем сервер в отдельном потоке
self._server.listen(host, self._port, timeout=self._timeout)
self._server_running = True
event_bus.publish(Event(EventTypes.TFTP_SERVER_STARTED, {
"host": host,
"port": self._port,
"root_dir": self._root_dir
}))
except Exception as e:
raise TFTPError(f"Ошибка запуска TFTP сервера: {e}")
def stop_server(self) -> None:
"""Остановка TFTP сервера."""
if self._server and self._server_running:
self._server.stop()
self._server = None
self._server_running = False
event_bus.publish(Event(EventTypes.TFTP_SERVER_STOPPED, None))
def upload_file(self, filename: str, host: str, remote_filename: Optional[str] = None,
progress_callback: Optional[Callable[[int], None]] = None) -> None:
"""
Загрузка файла на удаленное устройство.
Args:
filename: Путь к локальному файлу
host: IP-адрес устройства
remote_filename: Имя файла на устройстве
progress_callback: Функция обратного вызова для отслеживания прогресса
Raises:
TFTPError: При ошибке загрузки файла
"""
if not os.path.exists(filename):
raise TFTPError(f"Файл не найден: {filename}")
try:
# Создаем клиентский объект
self._client = tftpy.TftpClient(
host,
self._port,
options={"timeout": self._timeout, "retries": self._retries}
)
# Определяем имя удаленного файла
if not remote_filename:
remote_filename = os.path.basename(filename)
event_bus.publish(Event(EventTypes.TFTP_TRANSFER_STARTED, {
"operation": "upload",
"local_file": filename,
"remote_file": remote_filename,
"host": host
}))
# Загружаем файл
self._client.upload(
remote_filename,
filename,
progress_callback
)
event_bus.publish(Event(EventTypes.TFTP_TRANSFER_COMPLETED, {
"operation": "upload",
"local_file": filename,
"remote_file": remote_filename,
"host": host
}))
except Exception as e:
event_bus.publish(Event(EventTypes.TFTP_ERROR, str(e)))
raise TFTPError(f"Ошибка загрузки файла: {e}")
def download_file(self, remote_filename: str, host: str, local_filename: Optional[str] = None,
progress_callback: Optional[Callable[[int], None]] = None) -> None:
"""
Загрузка файла с удаленного устройства.
Args:
remote_filename: Имя файла на устройстве
host: IP-адрес устройства
local_filename: Путь для сохранения файла
progress_callback: Функция обратного вызова для отслеживания прогресса
Raises:
TFTPError: При ошибке загрузки файла
"""
try:
# Создаем клиентский объект
self._client = tftpy.TftpClient(
host,
self._port,
options={"timeout": self._timeout, "retries": self._retries}
)
# Определяем имя локального файла
if not local_filename:
local_filename = os.path.join(self._root_dir, remote_filename)
event_bus.publish(Event(EventTypes.TFTP_TRANSFER_STARTED, {
"operation": "download",
"local_file": local_filename,
"remote_file": remote_filename,
"host": host
}))
# Загружаем файл
self._client.download(
remote_filename,
local_filename,
progress_callback
)
event_bus.publish(Event(EventTypes.TFTP_TRANSFER_COMPLETED, {
"operation": "download",
"local_file": local_filename,
"remote_file": remote_filename,
"host": host
}))
except Exception as e:
event_bus.publish(Event(EventTypes.TFTP_ERROR, str(e)))
raise TFTPError(f"Ошибка загрузки файла: {e}")
@property
def is_server_running(self) -> bool:
"""Проверка состояния сервера."""
return self._server_running

View File

@@ -0,0 +1,223 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import logging
@dataclass
class ParsedResponse:
"""Структура разобранного ответа."""
success: bool
data: Any
error: Optional[str] = None
raw_response: Optional[str] = None
class ResponseParser:
"""Парсер ответов от сетевого устройства."""
def __init__(self):
self._logger = logging.getLogger(__name__)
# Регулярные выражения для парсинга
self._patterns = {
"error": r"(?i)error|invalid|failed|denied|rejected",
"success": r"(?i)success|completed|done|ok",
"version": r"(?i)version\s+(\S+)",
"model": r"(?i)model\s*:\s*(\S+)",
"hostname": r"(?i)hostname\s*:\s*(\S+)",
"interface": r"(?i)interface\s+(\S+)",
"ip_address": r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})",
"mac_address": r"([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})",
}
def parse_command_response(self, response: str) -> ParsedResponse:
"""
Парсинг ответа на команду.
Args:
response: Ответ устройства
Returns:
ParsedResponse: Структура с результатами парсинга
"""
# Проверяем на наличие ошибок
if re.search(self._patterns["error"], response, re.MULTILINE):
error_msg = self._extract_error_message(response)
return ParsedResponse(
success=False,
data=None,
error=error_msg,
raw_response=response
)
# Проверяем на успешное выполнение
success = bool(re.search(self._patterns["success"], response, re.MULTILINE))
return ParsedResponse(
success=success,
data=response.strip(),
raw_response=response
)
def parse_version(self, response: str) -> ParsedResponse:
"""
Парсинг версии ПО.
Args:
response: Ответ на команду show version
Returns:
ParsedResponse: Структура с результатами парсинга
"""
match = re.search(self._patterns["version"], response)
if match:
return ParsedResponse(
success=True,
data=match.group(1),
raw_response=response
)
return ParsedResponse(
success=False,
data=None,
error="Версия не найдена",
raw_response=response
)
def parse_model(self, response: str) -> ParsedResponse:
"""
Парсинг модели устройства.
Args:
response: Ответ на команду show model
Returns:
ParsedResponse: Структура с результатами парсинга
"""
match = re.search(self._patterns["model"], response)
if match:
return ParsedResponse(
success=True,
data=match.group(1),
raw_response=response
)
return ParsedResponse(
success=False,
data=None,
error="Модель не найдена",
raw_response=response
)
def parse_interfaces(self, response: str) -> ParsedResponse:
"""
Парсинг информации об интерфейсах.
Args:
response: Ответ на команду show interfaces
Returns:
ParsedResponse: Структура с результатами парсинга
"""
interfaces = []
# Ищем все интерфейсы
for line in response.splitlines():
if_match = re.search(self._patterns["interface"], line)
if if_match:
interface = {
"name": if_match.group(1),
"ip": None,
"mac": None
}
# Ищем IP-адрес
ip_match = re.search(self._patterns["ip_address"], line)
if ip_match:
interface["ip"] = ip_match.group(1)
# Ищем MAC-адрес
mac_match = re.search(self._patterns["mac_address"], line)
if mac_match:
interface["mac"] = mac_match.group(0)
interfaces.append(interface)
if interfaces:
return ParsedResponse(
success=True,
data=interfaces,
raw_response=response
)
return ParsedResponse(
success=False,
data=None,
error="Интерфейсы не найдены",
raw_response=response
)
def parse_config_result(self, response: str) -> ParsedResponse:
"""
Парсинг результата применения конфигурации.
Args:
response: Ответ на команды конфигурации
Returns:
ParsedResponse: Структура с результатами парсинга
"""
# Проверяем на наличие ошибок
if re.search(self._patterns["error"], response, re.MULTILINE):
error_msg = self._extract_error_message(response)
return ParsedResponse(
success=False,
data=None,
error=error_msg,
raw_response=response
)
# Если нет ошибок, считаем что конфигурация применена успешно
return ParsedResponse(
success=True,
data=response.strip(),
raw_response=response
)
def _extract_error_message(self, response: str) -> str:
"""
Извлечение сообщения об ошибке из ответа.
Args:
response: Ответ устройства
Returns:
str: Сообщение об ошибке
"""
# Ищем строку с ошибкой
for line in response.splitlines():
if re.search(self._patterns["error"], line, re.IGNORECASE):
return line.strip()
return "Неизвестная ошибка"
def add_pattern(self, name: str, pattern: str) -> None:
"""
Добавление нового шаблона для парсинга.
Args:
name: Имя шаблона
pattern: Регулярное выражение
"""
self._patterns[name] = pattern
self._logger.debug(f"Добавлен новый шаблон: {name} = {pattern}")
def get_pattern(self, name: str) -> Optional[str]:
"""
Получение шаблона по имени.
Args:
name: Имя шаблона
Returns:
Optional[str]: Регулярное выражение или None
"""
return self._patterns.get(name)

View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional, List, Dict, Any
import logging
from core.events import event_bus, Event, EventTypes
from core.exceptions import ConnectionError, AuthenticationError
from core.config import AppConfig
from .protocols.serial import SerialProtocol
class SerialManager:
"""Менеджер для работы с последовательным портом."""
def __init__(self):
self._protocol = SerialProtocol()
self._logger = logging.getLogger(__name__)
self._connected = False
self._authenticated = False
def configure(self, settings: Dict[str, Any]) -> None:
"""
Конфигурация последовательного порта.
Args:
settings: Словарь с настройками
"""
self._protocol.configure(
port=settings.get("port"),
baudrate=settings.get("baudrate", AppConfig.DEFAULT_BAUDRATE),
timeout=settings.get("timeout", AppConfig.DEFAULT_TIMEOUT),
prompt=settings.get("prompt", AppConfig.DEFAULT_PROMPT)
)
def connect(self) -> None:
"""
Установка соединения с устройством.
Raises:
ConnectionError: При ошибке подключения
"""
try:
self._protocol.connect()
self._connected = True
self._logger.info("Соединение установлено")
except ConnectionError as e:
self._connected = False
self._authenticated = False
self._logger.error(f"Ошибка подключения: {e}")
raise
def disconnect(self) -> None:
"""Разрыв соединения."""
if self._connected:
self._protocol.disconnect()
self._connected = False
self._authenticated = False
self._logger.info("Соединение разорвано")
def authenticate(self, username: str, password: str) -> None:
"""
Аутентификация на устройстве.
Args:
username: Имя пользователя
password: Пароль
Raises:
ConnectionError: При ошибке соединения
AuthenticationError: При ошибке аутентификации
"""
try:
self._protocol.authenticate(username, password)
self._authenticated = True
self._logger.info("Аутентификация успешна")
except (ConnectionError, AuthenticationError) as e:
self._authenticated = False
self._logger.error(f"Ошибка аутентификации: {e}")
raise
def send_command(self, command: str, timeout: Optional[int] = None) -> str:
"""
Отправка команды на устройство.
Args:
command: Команда для отправки
timeout: Таймаут ожидания ответа
Returns:
str: Ответ устройства
Raises:
ConnectionError: При ошибке отправки команды
"""
try:
response = self._protocol.send_command(command, timeout)
self._logger.debug(f"Отправлена команда: {command}")
self._logger.debug(f"Получен ответ: {response}")
return response
except ConnectionError as e:
self._logger.error(f"Ошибка отправки команды: {e}")
raise
def send_config(self, config_commands: List[str], timeout: Optional[int] = None) -> str:
"""
Отправка конфигурационных команд на устройство.
Args:
config_commands: Список команд конфигурации
timeout: Таймаут ожидания ответа
Returns:
str: Ответ устройства
Raises:
ConnectionError: При ошибке отправки команд
"""
try:
response = self._protocol.send_config(config_commands, timeout)
self._logger.info(f"Отправлено {len(config_commands)} команд конфигурации")
return response
except ConnectionError as e:
self._logger.error(f"Ошибка отправки конфигурации: {e}")
raise
@property
def is_connected(self) -> bool:
"""Проверка состояния подключения."""
return self._connected
@property
def is_authenticated(self) -> bool:
"""Проверка состояния аутентификации."""
return self._authenticated
@staticmethod
def list_ports() -> List[str]:
"""
Получение списка доступных COM-портов.
Returns:
List[str]: Список доступных портов
"""
return SerialProtocol.list_ports()