Files
ComConfigCopy/src/communication/protocols/serial.py

224 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import re
from typing import Optional, List
import serial
from serial.serialutil import SerialException
from core.config import AppConfig
from core.exceptions import ConnectionError, AuthenticationError
from .base import BaseProtocol
class SerialProtocol(BaseProtocol):
"""Протокол для работы с последовательным портом."""
def __init__(self):
super().__init__()
self._serial: Optional[serial.Serial] = None
self._prompt = AppConfig.DEFAULT_PROMPT
self._timeout = AppConfig.DEFAULT_TIMEOUT
self._baudrate = AppConfig.DEFAULT_BAUDRATE
self._port: Optional[str] = None
def configure(self, port: str, baudrate: int = None, timeout: int = None,
prompt: str = None) -> None:
"""
Конфигурация последовательного порта.
Args:
port: COM-порт
baudrate: Скорость передачи
timeout: Таймаут операций
prompt: Приглашение командной строки
"""
self._port = port
if baudrate is not None:
self._baudrate = baudrate
if timeout is not None:
self._timeout = timeout
if prompt is not None:
self._prompt = prompt
def connect(self) -> None:
"""
Установка соединения с устройством.
Raises:
ConnectionError: При ошибке подключения
"""
if not self._port:
raise ConnectionError("Не указан COM-порт")
try:
self._serial = serial.Serial(
port=self._port,
baudrate=self._baudrate,
timeout=1
)
time.sleep(1) # Ждем инициализации порта
self._notify_connection_established()
except SerialException as e:
self._notify_connection_error(e)
raise ConnectionError(f"Ошибка подключения к порту {self._port}: {e}")
def disconnect(self) -> None:
"""Закрытие соединения."""
if self._serial and self._serial.is_open:
self._serial.close()
self._notify_connection_lost()
def authenticate(self, username: str, password: str) -> None:
"""
Аутентификация на устройстве.
Args:
username: Имя пользователя
password: Пароль
Raises:
AuthenticationError: При ошибке аутентификации
ConnectionError: При ошибке соединения
"""
if not self.is_connected:
raise ConnectionError("Нет подключения к устройству")
try:
# Очищаем буфер
self._serial.reset_input_buffer()
self._serial.reset_output_buffer()
# Отправляем Enter для получения приглашения
self._serial.write(b"\n")
time.sleep(0.5)
# Ожидаем запрос логина или пароля
response = self._read_until(["login:", "username:", "password:", self._prompt])
if "login:" in response.lower() or "username:" in response.lower():
self._serial.write(f"{username}\n".encode())
time.sleep(0.5)
response = self._read_until(["password:", self._prompt])
if "password:" in response.lower():
self._serial.write(f"{password}\n".encode())
time.sleep(0.5)
response = self._read_until([self._prompt])
if self._prompt not in response:
raise AuthenticationError("Неверные учетные данные")
self._authenticated = True
except SerialException as e:
self._notify_connection_error(e)
raise ConnectionError(f"Ошибка при аутентификации: {e}")
def send_command(self, command: str, timeout: Optional[int] = None) -> str:
"""
Отправка команды на устройство.
Args:
command: Команда для отправки
timeout: Таймаут ожидания ответа
Returns:
str: Ответ устройства
Raises:
ConnectionError: При ошибке отправки команды
"""
if not self.is_connected:
raise ConnectionError("Нет подключения к устройству")
if not self.is_authenticated:
raise ConnectionError("Требуется аутентификация")
try:
# Очищаем буфер перед отправкой
self._serial.reset_input_buffer()
# Отправляем команду
self._serial.write(f"{command}\n".encode())
# Ожидаем ответ
response = self._read_until([self._prompt], timeout or self._timeout)
# Удаляем отправленную команду из ответа
lines = response.splitlines()
if lines and command in lines[0]:
lines.pop(0)
return "\n".join(lines).strip()
except SerialException as e:
self._notify_connection_error(e)
raise ConnectionError(f"Ошибка при отправке команды: {e}")
def send_config(self, config_commands: List[str], timeout: Optional[int] = None) -> str:
"""
Отправка конфигурационных команд на устройство.
Args:
config_commands: Список команд конфигурации
timeout: Таймаут ожидания ответа
Returns:
str: Ответ устройства
Raises:
ConnectionError: При ошибке отправки команд
"""
responses = []
for command in config_commands:
response = self.send_command(command, timeout)
responses.append(response)
return "\n".join(responses)
def _read_until(self, patterns: List[str], timeout: Optional[int] = None) -> str:
"""
Чтение данных из порта до появления одного из паттернов.
Args:
patterns: Список паттернов для поиска
timeout: Таймаут операции
Returns:
str: Прочитанные данные
Raises:
ConnectionError: При ошибке чтения или таймауте
"""
if not patterns:
raise ValueError("Не указаны паттерны для поиска")
timeout = timeout or self._timeout
end_time = time.time() + timeout
buffer = ""
while time.time() < end_time:
if self._serial.in_waiting:
chunk = self._serial.read(self._serial.in_waiting).decode(errors="ignore")
buffer += chunk
# Проверяем наличие паттернов
for pattern in patterns:
if pattern in buffer:
return buffer
# Небольшая задержка для снижения нагрузки на CPU
time.sleep(0.1)
raise ConnectionError(f"Таймаут операции ({timeout} сек)")
@staticmethod
def list_ports() -> List[str]:
"""
Получение списка доступных COM-портов.
Returns:
List[str]: Список доступных портов
"""
return [port.device for port in serial.tools.list_ports.comports()]