18 Commits

Author SHA1 Message Date
2f4b2985cd Remove documentation and CLI mode features
- Remove "Documentation" menu option
- Delete unused CLI mode code
- Clean up commented-out argument parsing function
2025-02-16 05:00:43 +03:00
d1a870fed7 Add update checking and documentation features
- Implement UpdateChecker for version comparison and update notifications
- Add menu options for documentation and update checking
- Enhance AboutWindow with dynamic version display
- Update requirements.txt with new dependencies
- Create infrastructure for opening local documentation
- Improve application menu with additional help options
2025-02-16 04:50:33 +03:00
2e2dd9e705 Add custom text and entry widgets with enhanced copy/paste functionality
- Implement CustomText and CustomEntry classes with advanced text interaction features
- Add context menu for text widgets with cut, copy, paste, and select all options
- Support multiple keyboard shortcuts for text manipulation
- Replace standard Tkinter Text and Entry widgets with custom implementations
- Remove global text/entry widget bindings in favor of class-specific methods
2025-02-16 03:57:48 +03:00
d937042ea2 Update application title to reflect project name
- Change window title from "Serial Device Manager" to "ComConfigCopy"
2025-02-16 03:53:21 +03:00
136c7877d3 Refactor TFTP file transfer with improved reliability and error handling
- Implement more robust file transfer mechanism with configurable retry and timeout settings
- Add detailed logging for transfer progress and error scenarios
- Enhance block transfer logic with better error recovery
- Simplify transfer socket management and cleanup process
- Improve overall transfer reliability and error tracking
2025-02-16 03:50:27 +03:00
467d582095 Improve file transfer progress tracking and display
- Add dynamic transfer speed calculation
- Compute and display estimated remaining transfer time
- Enhance remaining bytes display with more informative status
- Update transfers table with more detailed transfer progress information
2025-02-16 03:43:34 +03:00
16526b4643 Merge pull request 'TFTP' (#3) from TFTP into main
Reviewed-on: #3
2025-02-16 00:39:43 +00:00
b8bae39a17 Update .gitignore to include .venv/ directory 2025-02-16 03:37:28 +03:00
6d2819a860 Add network adapter selection for TFTP server
- Implement `get_network_adapters()` function to dynamically retrieve available network interfaces
- Replace IP address entry with a combobox for network adapter selection
- Add "Update" button to refresh network adapter list
- Improve IP address validation and error handling for TFTP server configuration
- Enhance UI with more user-friendly network interface selection
2025-02-16 03:34:57 +03:00
a252a0f153 Prevent duplicate TFTP server log messages and improve state tracking
- Add filtering mechanism to prevent repeated server start/stop log entries
- Implement state tracking flags to manage server status
- Remove redundant log messages in both ComConfigCopy.py and TFTPServer.py
- Enhance log callback to avoid unnecessary logging of server state changes
2025-02-16 03:31:32 +03:00
3126811f09 Improve TFTP server shutdown and error handling
- Enhance server stop mechanism with more robust socket and thread management
- Add better handling of active transfers during server shutdown
- Implement additional safety checks and timeout handling
- Improve logging and error reporting for server stop process
- Prevent potential deadlocks and resource leaks during server termination
2025-02-16 03:28:53 +03:00
f1ca31c198 Enhance TFTP server implementation with advanced monitoring and UI improvements
- Completely refactor TFTP server implementation with more robust file transfer handling
- Add detailed transfer tracking with active transfers table
- Implement periodic transfer status updates
- Improve log and UI layout for TFTP server tab
- Add more granular error handling and logging
- Enhance threading and socket management for file transfers
2025-02-16 03:19:44 +03:00
c95915483f Update .gitignore to include virtual environment directory
- Add '.venv/' to .gitignore to exclude Python virtual environment files
2025-02-16 03:09:23 +03:00
299ce329f7 Add TFTP server functionality to the application
- Implement TFTP server tab with IP and port configuration
- Create methods to start and stop TFTP server
- Add logging functionality for TFTP server events
- Integrate TFTPServer class into the main application
- Re-enable Firmware directory creation
2025-02-16 02:51:47 +03:00
dc81fed9d9 Remove TFTP server implementation 2025-02-16 02:35:25 +03:00
853a441eb9 Update .gitignore and remove Visual Studio project file
- Add 'output/' directory to .gitignore
- Remove ComConfigCopy.pyproj Visual Studio project file
2025-02-16 02:25:51 +03:00
ece18aea4d Add About window to application menu
- Import and integrate AboutWindow class
- Create new "Help" menu with "About Program" option
- Add `open_about()` method to display About window
- Enhance application's help and information accessibility
2025-02-16 02:24:40 +03:00
625b6d5558 Improve first-run detection and settings initialization
- Enhance `settings_load()` function to handle missing settings and parameters
- Improve first-run detection in `check_first_run()` method
- Add more robust settings file validation
- Remove unused imports
- Simplify and clean up code comments
2025-02-16 02:04:04 +03:00
8 changed files with 1204 additions and 267 deletions

4
.gitignore vendored
View File

@@ -4,6 +4,8 @@ Configs/Eltex MES2424 AC - Сеть FTTB 2G, доп.txt
Configs/конфиг доп 3750-52 с айпи 172.17.141.133 .txt
DALL·E 2024-12-29 01.01.02 - Square vector logo_ A clean and minimalistic app icon for serial port management software. The design prominently features a simplified rectangular CO.ico
test.py
__pycache__/TFTPServer.cpython-312.pyc
__pycache__/
Firmware/1.jpg
Firmware/2
output/
.venv/

View File

@@ -1,32 +1,27 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ------------------------------------------------------------
# Это программа для копирования конфигураций на коммутаторы
# Программа использует библиотеку PySerial для работы с последовательными портами
# Программа использует библиотеку PyQt5 для создания графического интерфейса
# Программа использует библиотеку PyQt5.QtWidgets для создания графического интерфейса
# Программа использует библиотеку PyQt5.QtCore для создания графического интерфейса
# Программа использует библиотеку PyQt5.QtGui для создания графического интерфейса
# ------------------------------------------------------------
# import argparse Использовался для получения аргументов из командной строки
# import platform Использовался для получения списка сетевых адаптеров
# import subprocess Использовался для получения списка сетевых адаптеров
# import argparse Использовался для получения аргументов из командной строки (не используется)
# import platform Использовался для получения списка сетевых адаптеров (не используется)
# import subprocess Использовался для получения списка сетевых адаптеров (не используется)
# import socket не используется
import json
import logging
import os
import re
import socket
import sys
import threading
import time
import webbrowser
from getpass import getpass
from logging.handlers import RotatingFileHandler
import tkinter as tk
from tkinter import (
Tk,
Frame,
StringVar,
END,
BOTH,
@@ -42,13 +37,21 @@ from tkinter import ttk
import serial
import serial.tools.list_ports
from serial.serialutil import SerialException
from about_window import AboutWindow
from TFTPServer import TFTPServer
# from TFTPServer import TFTPServerThread
import socket
from update_checker import UpdateChecker
# Версия программы
VERSION = "1.0.0"
# Создаем необходимые папки
os.makedirs("Logs", exist_ok=True)
os.makedirs("Configs", exist_ok=True)
os.makedirs("Settings", exist_ok=True)
# os.makedirs("Firmware", exist_ok=True)
os.makedirs("Firmware", exist_ok=True)
os.makedirs("docs", exist_ok=True)
# Файл настроек находится в папке Settings
SETTINGS_FILE = os.path.join("Settings", "settings.json")
@@ -82,21 +85,42 @@ def settings_load():
"block_size": 15, # Размер блока команд
"prompt": ">", # Используется для определения приглашения
}
# Создаем папку Settings, если её нет
os.makedirs("Settings", exist_ok=True)
if not os.path.exists(SETTINGS_FILE):
try:
# При первом запуске создаем новый файл настроек
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(default_settings, f, indent=4, ensure_ascii=False)
logging.info("Файл настроек создан с настройками по умолчанию.")
return default_settings
except Exception as e:
logging.error(f"Ошибка при создании файла настроек: {e}", exc_info=True)
return default_settings
return default_settings
try:
# Пытаемся загрузить существующие настройки
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
# Проверяем наличие всех необходимых параметров
settings_changed = False
for key, value in default_settings.items():
if key not in settings:
settings[key] = value
settings_changed = True
# Если были добавлены новые параметры, сохраняем обновленные настройки
if settings_changed:
try:
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(settings, f, indent=4, ensure_ascii=False)
logging.info("Файл настроек обновлен с новыми параметрами.")
except Exception as e:
logging.error(f"Ошибка при обновлении файла настроек: {e}", exc_info=True)
return settings
except Exception as e:
logging.error(f"Ошибка загрузки файла настроек: {e}", exc_info=True)
@@ -121,25 +145,37 @@ def list_serial_ports():
# Функции работы с сетевыми адаптерами (не используются)
# ==========================
# def list_network_adapters():
# """Возвращает список названий сетевых адаптеров (Windows)."""
# adapters = []
# if platform.system() == "Windows":
# try:
# output = subprocess.check_output(
# 'wmic nic get NetConnectionID',
# shell=True,
# encoding="cp866"
# )
# for line in output.splitlines():
# line = line.strip()
# if line and line != "NetConnectionID":
# adapters.append(line)
# except Exception as e:
# logging.error(f"Ошибка при получении списка адаптеров: {e}", exc_info=True)
# else:
# adapters = ["eth0"]
# return adapters
def get_network_adapters():
"""Получение списка сетевых адаптеров и их IP-адресов."""
adapters = []
try:
# Получаем имя хоста
hostname = socket.gethostname()
# Получаем все адреса для данного хоста
addresses = socket.getaddrinfo(hostname, None)
# Создаем множество для хранения уникальных IP-адресов
unique_ips = set()
for addr in addresses:
ip = addr[4][0]
# Пропускаем IPv6 и локальные адреса
if ':' not in ip and not ip.startswith('127.'):
unique_ips.add(ip)
# Добавляем все найденные IP-адреса в список
for ip in sorted(unique_ips):
adapters.append(f"{ip}")
# Добавляем 0.0.0.0 для прослушивания всех интерфейсов
adapters.insert(0, "0.0.0.0")
except Exception as e:
logging.error(f"Ошибка при получении списка сетевых адаптеров: {e}", exc_info=True)
# В случае ошибки возвращаем хотя бы 0.0.0.0
adapters = ["0.0.0.0"]
return adapters
# ==========================
# Функции работы с COM-соединением
@@ -407,9 +443,109 @@ def execute_commands_from_file(
logging.error(f"Ошибка при выполнении команд из файла: {e}", exc_info=True)
# ==========================
# Графический интерфейс (Tkinter)
# Улучшенные текстовые виджеты
# ==========================
class CustomText(tk.Text):
"""Улучшенный текстовый виджет с расширенной функциональностью копирования/вставки"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_context_menu()
self.bind_shortcuts()
def create_context_menu(self):
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
def show_context_menu(self, event):
self.context_menu.post(event.x_root, event.y_root)
def bind_shortcuts(self):
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Shift+Insert для вставки
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
# Ctrl+Insert для копирования
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
# Shift+Delete для вырезания
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
def cut(self):
self.event_generate("<<Cut>>")
def copy(self):
self.event_generate("<<Copy>>")
def paste(self):
self.event_generate("<<Paste>>")
def select_all(self, event=None):
self.tag_add(tk.SEL, "1.0", tk.END)
self.mark_set(tk.INSERT, "1.0")
self.see(tk.INSERT)
return "break"
class CustomEntry(ttk.Entry):
"""Улучшенное поле ввода с расширенной функциональностью копирования/вставки"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.create_context_menu()
self.bind_shortcuts()
def create_context_menu(self):
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="Вырезать", command=self.cut)
self.context_menu.add_command(label="Копировать", command=self.copy)
self.context_menu.add_command(label="Вставить", command=self.paste)
self.context_menu.add_separator()
self.context_menu.add_command(label="Выделить всё", command=self.select_all)
self.bind("<Button-3>", self.show_context_menu)
def show_context_menu(self, event):
self.context_menu.post(event.x_root, event.y_root)
def bind_shortcuts(self):
# Стандартные сочетания
self.bind("<Control-x>", lambda e: self.event_generate("<<Cut>>"))
self.bind("<Control-c>", lambda e: self.event_generate("<<Copy>>"))
self.bind("<Control-v>", lambda e: self.event_generate("<<Paste>>"))
self.bind("<Control-a>", self.select_all)
# Shift+Insert для вставки
self.bind("<Shift-Insert>", lambda e: self.event_generate("<<Paste>>"))
# Ctrl+Insert для копирования
self.bind("<Control-Insert>", lambda e: self.event_generate("<<Copy>>"))
# Shift+Delete для вырезания
self.bind("<Shift-Delete>", lambda e: self.event_generate("<<Cut>>"))
def cut(self):
self.event_generate("<<Cut>>")
def copy(self):
self.event_generate("<<Copy>>")
def paste(self):
self.event_generate("<<Paste>>")
def select_all(self, event=None):
self.select_range(0, tk.END)
return "break"
class SettingsWindow(tk.Toplevel):
def __init__(self, parent, settings, callback=None):
super().__init__(parent)
@@ -456,13 +592,13 @@ class SettingsWindow(tk.Toplevel):
# Размер блока
ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5)
self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
block_size_entry = ttk.Entry(settings_frame, textvariable=self.block_size_var)
block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var)
block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
# Приглашение командной строки
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
self.prompt_var = StringVar(value=settings.get("prompt", ">"))
prompt_entry = ttk.Entry(settings_frame, textvariable=self.prompt_var)
prompt_entry = CustomEntry(settings_frame, textvariable=self.prompt_var)
prompt_entry.grid(row=5, column=1, sticky=W, pady=5)
# Кнопки
@@ -511,22 +647,27 @@ class SettingsWindow(tk.Toplevel):
class SerialAppGUI(tk.Tk):
def __init__(self, settings):
super().__init__()
self.title("Serial Device Manager")
self.title("ComConfigCopy")
self.geometry("900x700")
# Добавляем VERSION как атрибут класса
self.VERSION = VERSION
# Инициализация проверки обновлений
self.update_checker = UpdateChecker(
VERSION,
"https://gitea.filow.ru/LowaSC/ComConfigCopy"
)
# Настройка стиля
self.style = ttk.Style(self)
self.style.theme_use("clam")
default_font = ("Segoe UI", 10)
self.option_add("*Font", default_font)
self.settings = settings
self.connection = None
# Глобальные биндинги
self.bind_class("Text", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Text", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Text", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.bind_class("Entry", "<Control-c>", lambda event: event.widget.event_generate("<<Copy>>"))
self.bind_class("Entry", "<Control-v>", lambda event: event.widget.event_generate("<<Paste>>"))
self.bind_class("Entry", "<Control-x>", lambda event: event.widget.event_generate("<<Cut>>"))
self.tftp_server = None
self.create_menu()
self.create_tabs()
@@ -534,19 +675,6 @@ class SerialAppGUI(tk.Tk):
# Проверка первого запуска
self.check_first_run()
def check_first_run(self):
# Проверяем, существует ли папка Settings и файл settings.json
if not os.path.exists("Settings") or not os.path.exists(SETTINGS_FILE):
# Создаем папку Settings, если её нет
os.makedirs("Settings", exist_ok=True)
response = messagebox.askyesno(
"Первый запуск",
"Это первый запуск программы. Хотите настроить параметры подключения сейчас?"
)
if response:
self.open_settings()
def create_menu(self):
menubar = tk.Menu(self)
self.config(menu=menubar)
@@ -558,14 +686,84 @@ class SerialAppGUI(tk.Tk):
file_menu.add_separator()
file_menu.add_command(label="Выход", command=self.quit)
# Меню "Справка"
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Справка", menu=help_menu)
help_menu.add_command(label="Проверить обновления", command=self.check_for_updates)
help_menu.add_separator()
help_menu.add_command(label="О программе", command=self.open_about)
def check_for_updates(self):
"""Проверка наличия обновлений"""
def on_update_check(update_available, error):
if error:
messagebox.showerror(
"Ошибка проверки обновлений",
f"Не удалось проверить наличие обновлений:\n{error}"
)
elif update_available:
release_info = self.update_checker.get_release_notes()
if release_info:
response = messagebox.askyesno(
"Доступно обновление",
f"Доступна новая версия {release_info['version']}!\n\n"
f"Изменения:\n{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?",
)
if response:
webbrowser.open(release_info["download_url"])
else:
messagebox.showerror(
"Ошибка",
"Не удалось получить информацию о новой версии"
)
else:
messagebox.showinfo(
"Проверка обновлений",
"У вас установлена последняя версия программы"
)
self.update_checker.check_updates(callback=on_update_check)
def on_settings_changed(self):
"""Обработчик изменения настроек"""
self.settings = settings_load()
def open_settings(self):
"""Открытие окна настроек"""
settings_window = SettingsWindow(self, self.settings, self.on_settings_changed)
settings_window.transient(self)
settings_window.grab_set()
def on_settings_changed(self):
# Обновляем настройки в основном приложении
self.settings = settings_load()
def check_first_run(self):
"""Проверка первого запуска программы"""
show_settings = False
# Проверяем существование файла настроек
if not os.path.exists(SETTINGS_FILE):
show_settings = True
else:
# Проверяем содержимое файла настроек
try:
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
# Если порт не настроен, считаем это первым запуском
if settings.get("port") is None:
show_settings = True
except Exception:
# Если файл поврежден или не читается, тоже показываем настройки
show_settings = True
if show_settings:
# Создаем папку Settings, если её нет
os.makedirs("Settings", exist_ok=True)
response = messagebox.askyesno(
"Первый запуск",
"Это первый запуск программы. Хотите настроить параметры подключения сейчас?"
)
if response:
self.open_settings()
def create_tabs(self):
self.notebook = ttk.Notebook(self)
@@ -575,14 +773,17 @@ class SerialAppGUI(tk.Tk):
interactive_frame = ttk.Frame(self.notebook)
file_exec_frame = ttk.Frame(self.notebook)
config_editor_frame = ttk.Frame(self.notebook)
tftp_frame = ttk.Frame(self.notebook)
self.notebook.add(interactive_frame, text="Интерактивный режим")
self.notebook.add(file_exec_frame, text="Выполнение файла")
self.notebook.add(config_editor_frame, text="Редактор конфигурации")
self.notebook.add(tftp_frame, text="TFTP Сервер")
self.create_interactive_tab(interactive_frame)
self.create_file_exec_tab(file_exec_frame)
self.create_config_editor_tab(config_editor_frame)
self.create_tftp_tab(tftp_frame)
# -------------- Вкладка "Интерактивный режим" --------------
def create_interactive_tab(self, frame):
@@ -591,13 +792,13 @@ class SerialAppGUI(tk.Tk):
ttk.Button(control_frame, text="Подключиться", command=self.connect_device).pack(side=LEFT, padx=5)
ttk.Button(control_frame, text="Отключиться", command=self.disconnect_device).pack(side=LEFT, padx=5)
self.interactive_text = tk.Text(frame, wrap="word", height=20)
self.interactive_text = CustomText(frame, wrap="word", height=20)
self.interactive_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
input_frame = ttk.Frame(frame)
input_frame.pack(fill=X, pady=5)
ttk.Label(input_frame, text="Команда:").pack(side=LEFT, padx=5)
self.command_entry = ttk.Entry(input_frame, width=50)
self.command_entry = CustomEntry(input_frame, width=50)
self.command_entry.pack(side=LEFT, padx=5)
ttk.Button(input_frame, text="Отправить", command=self.send_command).pack(side=LEFT, padx=5)
@@ -688,10 +889,10 @@ class SerialAppGUI(tk.Tk):
file_frame.pack(fill=X, pady=5)
ttk.Label(file_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.file_exec_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
CustomEntry(file_frame, textvariable=self.file_exec_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(file_frame, text="Выбрать", command=self.select_config_file_fileexec).pack(side=LEFT, padx=5)
ttk.Button(frame, text="Выполнить команды", command=self.execute_file_commands).pack(pady=5)
self.file_exec_text = tk.Text(frame, wrap="word", height=15)
self.file_exec_text = CustomText(frame, wrap="word", height=15)
self.file_exec_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def select_config_file_fileexec(self):
@@ -737,11 +938,11 @@ class SerialAppGUI(tk.Tk):
top_frame.pack(fill=X, pady=5)
ttk.Label(top_frame, text="Файл конфигурации:").pack(side=LEFT, padx=5)
self.editor_file_var = StringVar(value=self.settings.get("config_file") or "")
ttk.Entry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
CustomEntry(top_frame, textvariable=self.editor_file_var, width=40).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Выбрать", command=self.select_config_file_editor).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Загрузить", command=self.load_config_file).pack(side=LEFT, padx=5)
ttk.Button(top_frame, text="Сохранить", command=self.save_config_file).pack(side=LEFT, padx=5)
self.config_editor_text = tk.Text(frame, wrap="word")
self.config_editor_text = CustomText(frame, wrap="word")
self.config_editor_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
def select_config_file_editor(self):
@@ -780,81 +981,275 @@ class SerialAppGUI(tk.Tk):
logging.error(f"Ошибка сохранения файла: {e}", exc_info=True)
messagebox.showerror("Ошибка", f"Не удалось сохранить файл:\n{e}")
# ==========================
# Парсер аргументов (не используется)
# ==========================
# def parse_arguments():
# parser = argparse.ArgumentParser(
# description="Программа для работы с устройствами через последовательный порт с графическим интерфейсом."
# )
# parser.add_argument("--cli", action="store_true", help="Запустить в режиме командной строки (без графики)")
# return parser.parse_args()
def open_about(self):
about_window = AboutWindow(self)
about_window.transient(self)
about_window.grab_set()
# ==========================
# Режим командной строки (не используется)
# ==========================
def create_tftp_tab(self, frame):
"""Создание вкладки TFTP сервера."""
# Создаем фрейм для управления TFTP сервером
tftp_frame = ttk.Frame(frame)
tftp_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
# def run_cli_mode(settings):
# print("Запущен режим командной строки.")
# while True:
# print("\n--- Главное меню ---")
# print("1. Подключиться и войти в интерактивный режим")
# print("2. Выполнить команды из файла конфигурации")
# print("0. Выход")
# choice = input("Выберите действие: ").strip()
# if choice == "1":
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if not conn:
# print("[ERROR] Не удалось установить соединение.")
# continue
# print("[INFO] Введите команды (для выхода введите _exit):")
# while True:
# cmd = input("Команда: ")
# if cmd.strip().lower() == "_exit":
# break
# try:
# conn.write((cmd + "\n").encode())
# response = read_response(
# conn, settings.get("timeout", 10),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False
# )
# print(response)
# except Exception as e:
# print(f"[ERROR] {e}")
# break
# conn.close()
# elif choice == "2":
# if not settings.get("config_file"):
# print("[ERROR] Файл конфигурации не выбран!")
# continue
# if not settings.get("port"):
# print("[ERROR] COM-порт не выбран!")
# continue
# conn = create_connection(settings)
# if conn:
# execute_commands_from_file(
# conn,
# settings["config_file"],
# settings.get("timeout", 10),
# settings.get("copy_mode", "line"),
# settings.get("block_size", 15),
# lambda msg: print(msg),
# login=settings.get("login"),
# password=settings.get("password"),
# is_gui=False,
# )
# conn.close()
# else:
# print("[ERROR] Не удалось установить соединение.")
# elif choice == "0":
# break
# else:
# print("[ERROR] Некорректный выбор.")
# Создаем и размещаем элементы управления
controls_frame = ttk.LabelFrame(tftp_frame, text="Управление TFTP сервером")
controls_frame.pack(fill=X, padx=5, pady=5)
# IP адрес
ip_frame = ttk.Frame(controls_frame)
ip_frame.pack(fill=X, padx=5, pady=2)
ttk.Label(ip_frame, text="IP адрес:").pack(side=LEFT, padx=5)
self.tftp_ip_var = StringVar(value="0.0.0.0")
self.tftp_ip_combo = ttk.Combobox(ip_frame, textvariable=self.tftp_ip_var, state="readonly")
self.tftp_ip_combo.pack(side=LEFT, fill=X, expand=True, padx=5)
ttk.Button(ip_frame, text="Обновить", command=self.update_network_adapters).pack(side=LEFT, padx=5)
# Заполняем список адаптеров
self.update_network_adapters()
# Порт
port_frame = ttk.Frame(controls_frame)
port_frame.pack(fill=X, padx=5, pady=2)
ttk.Label(port_frame, text="Порт:").pack(side=LEFT, padx=5)
self.tftp_port_var = StringVar(value="69")
self.tftp_port_entry = CustomEntry(port_frame, textvariable=self.tftp_port_var)
self.tftp_port_entry.pack(fill=X, expand=True, padx=5)
# Кнопки управления
buttons_frame = ttk.Frame(controls_frame)
buttons_frame.pack(fill=X, padx=5, pady=5)
self.start_tftp_button = ttk.Button(
buttons_frame,
text="Запустить сервер",
command=self.start_tftp_server
)
self.start_tftp_button.pack(side=LEFT, padx=5)
self.stop_tftp_button = ttk.Button(
buttons_frame,
text="Остановить сервер",
command=self.stop_tftp_server,
state="disabled"
)
self.stop_tftp_button.pack(side=LEFT, padx=5)
# Лог сервера
log_frame = ttk.LabelFrame(tftp_frame, text="Лог сервера")
log_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
self.tftp_log_text = CustomText(log_frame, wrap=tk.WORD, height=10)
self.tftp_log_text.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Добавляем скроллбар для лога
scrollbar = ttk.Scrollbar(self.tftp_log_text, command=self.tftp_log_text.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.tftp_log_text.config(yscrollcommand=scrollbar.set)
# Статус передач
transfers_frame = ttk.LabelFrame(tftp_frame, text="Активные передачи")
transfers_frame.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Создаем таблицу для отображения активных передач
columns = ("client", "filename", "progress", "remaining", "time")
self.transfers_tree = ttk.Treeview(transfers_frame, columns=columns, show="headings")
# Настраиваем заголовки колонок
self.transfers_tree.heading("client", text="Клиент")
self.transfers_tree.heading("filename", text="Файл")
self.transfers_tree.heading("progress", text="Прогресс")
self.transfers_tree.heading("remaining", text="Осталось")
self.transfers_tree.heading("time", text="Время")
# Настраиваем ширину колонок
self.transfers_tree.column("client", width=120)
self.transfers_tree.column("filename", width=150)
self.transfers_tree.column("progress", width=100)
self.transfers_tree.column("remaining", width=100)
self.transfers_tree.column("time", width=80)
self.transfers_tree.pack(fill=BOTH, expand=True, padx=5, pady=5)
# Инициализация TFTP сервера
self.tftp_server = None
self.tftp_server_thread = None
def start_tftp_server(self):
"""Запуск TFTP сервера."""
try:
# Получаем выбранный IP-адрес
ip = self.tftp_ip_var.get()
if not ip:
messagebox.showerror("Ошибка", "Выберите IP-адрес для TFTP сервера")
return
# Проверяем корректность порта
try:
port = int(self.tftp_port_var.get())
if port <= 0 or port > 65535:
raise ValueError("Порт должен быть в диапазоне 1-65535")
except ValueError as e:
messagebox.showerror("Ошибка", f"Некорректный порт: {str(e)}")
return
# Создаем экземпляр TFTP сервера
self.tftp_server = TFTPServer("Firmware")
# Устанавливаем callback для логирования
def log_callback(message):
# Фильтруем дублирующиеся сообщения о запуске/остановке сервера
if "[INFO] TFTP сервер запущен" in message and hasattr(self, '_server_started'):
return
if "[INFO] TFTP сервер остановлен" in message and hasattr(self, '_server_stopped'):
return
self.append_tftp_log(message)
# Устанавливаем флаги для отслеживания состояния
if "[INFO] TFTP сервер запущен" in message:
self._server_started = True
elif "[INFO] TFTP сервер остановлен" in message:
self._server_stopped = True
# Обновляем информацию о передачах
self.update_transfers_info()
self.tftp_server.set_log_callback(log_callback)
# Запускаем сервер в отдельном потоке
self.tftp_server_thread = threading.Thread(
target=self.run_tftp_server,
args=(ip, port),
daemon=True
)
self.tftp_server_thread.start()
# Обновляем состояние кнопок и элементов управления
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal")
self.tftp_ip_combo.config(state="disabled")
self.tftp_port_entry.config(state="disabled")
# Запускаем периодическое обновление информации о передачах
self.update_transfers_periodically()
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось запустить TFTP сервер: {str(e)}")
def run_tftp_server(self, ip, port):
"""Запуск TFTP сервера в отдельном потоке."""
try:
self.tftp_server.start_server(ip, port)
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка работы сервера: {str(e)}")
def stop_tftp_server(self):
"""Остановка TFTP сервера."""
if self.tftp_server:
try:
# Отключаем кнопки на время остановки сервера
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="disabled")
# Сбрасываем флаги состояния
if hasattr(self, '_server_started'):
delattr(self, '_server_started')
if hasattr(self, '_server_stopped'):
delattr(self, '_server_stopped')
# Останавливаем сервер
self.tftp_server.stop_server()
# Ждем завершения потока сервера с таймаутом
if self.tftp_server_thread:
self.tftp_server_thread.join(timeout=5.0)
if self.tftp_server_thread.is_alive():
self.append_tftp_log("[WARN] Превышено время ожидания остановки сервера")
# Очищаем ссылки на сервер и поток
self.tftp_server = None
self.tftp_server_thread = None
# Обновляем состояние кнопок
self.start_tftp_button.config(state="normal")
self.stop_tftp_button.config(state="disabled")
self.tftp_ip_combo.config(state="normal")
self.tftp_port_entry.config(state="normal")
# Очищаем таблицу передач
for item in self.transfers_tree.get_children():
self.transfers_tree.delete(item)
except Exception as e:
self.append_tftp_log(f"[ERROR] Ошибка остановки сервера: {str(e)}")
messagebox.showerror("Ошибка", f"Не удалось остановить TFTP сервер: {str(e)}")
# Восстанавливаем состояние кнопок в случае ошибки
self.start_tftp_button.config(state="disabled")
self.stop_tftp_button.config(state="normal")
def append_tftp_log(self, message):
"""Добавление сообщения в лог TFTP сервера."""
self.tftp_log_text.insert(END, message + "\n")
self.tftp_log_text.see(END)
def update_transfers_info(self):
"""Обновление информации об активных передачах."""
if not self.tftp_server:
return
# Очищаем текущие записи
for item in self.transfers_tree.get_children():
self.transfers_tree.delete(item)
# Добавляем информацию о текущих передачах
for client_addr, transfer_info in self.tftp_server.active_transfers.items():
filename = transfer_info['filename']
bytes_sent = transfer_info['bytes_sent']
filesize = transfer_info['filesize']
start_time = transfer_info['start_time']
# Вычисляем прогресс
progress = f"{bytes_sent}/{filesize} байт"
remaining_bytes = filesize - bytes_sent
elapsed_time = time.time() - start_time
# Вычисляем скорость передачи (байт/сек)
if elapsed_time > 0:
transfer_speed = bytes_sent / elapsed_time
# Вычисляем оставшееся время
if transfer_speed > 0:
remaining_time = remaining_bytes / transfer_speed
remaining_str = f"{remaining_bytes} байт (~{int(remaining_time)}с)"
else:
remaining_str = f"{remaining_bytes} байт (неизвестно)"
else:
remaining_str = f"{remaining_bytes} байт (вычисляется...)"
# Добавляем запись в таблицу
self.transfers_tree.insert("", END, values=(
f"{client_addr[0]}:{client_addr[1]}",
filename,
progress,
remaining_str,
f"{elapsed_time:.1f}с"
))
def update_transfers_periodically(self):
"""Периодическое обновление информации о передачах."""
if self.tftp_server and self.tftp_server.running:
self.update_transfers_info()
# Планируем следующее обновление через 1 секунду
self.after(1000, self.update_transfers_periodically)
def update_network_adapters(self):
"""Обновление списка сетевых адаптеров."""
adapters = get_network_adapters()
self.tftp_ip_combo["values"] = adapters
if not self.tftp_ip_var.get() in adapters:
self.tftp_ip_var.set(adapters[0])
# ==========================
# Основной запуск приложения

View File

@@ -1,37 +0,0 @@
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="4.0">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>5a5ba42b-743e-401d-ac33-9abfe8f095a1</ProjectGuid>
<ProjectHome>.</ProjectHome>
<StartupFile>ComConfigCopy.py</StartupFile>
<SearchPath>
</SearchPath>
<WorkingDirectory>.</WorkingDirectory>
<OutputPath>.</OutputPath>
<Name>ComConfigCopy</Name>
<RootNamespace>ComConfigCopy</RootNamespace>
<TestFramework>Pytest</TestFramework>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<DebugSymbols>true</DebugSymbols>
<EnableUnmanagedDebugging>false</EnableUnmanagedDebugging>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<DebugSymbols>true</DebugSymbols>
<EnableUnmanagedDebugging>false</EnableUnmanagedDebugging>
</PropertyGroup>
<ItemGroup>
<Compile Include="ComConfigCopy.py" />
<Compile Include="test.py" />
</ItemGroup>
<Import Project="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Python Tools\Microsoft.PythonTools.targets" />
<!-- Uncomment the CoreCompile target to enable the Build command in
Visual Studio and specify your pre- and post-build commands in
the BeforeBuild and AfterBuild targets below. -->
<!--<Target Name="CoreCompile" />-->
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
</Project>

44
README.md Normal file
View File

@@ -0,0 +1,44 @@
# ComConfigCopy
Программа для копирования конфигураций на коммутаторы.
## Описание
ComConfigCopy - это утилита, разработанная для автоматизации процесса копирования конфигураций на сетевые коммутаторы. Программа предоставляет удобный графический интерфейс для управления процессом копирования и настройки параметров подключения.
## Основные возможности
- Копирование конфигураций на коммутаторы через COM-порт
- Поддержка различных скоростей подключения
- Автоматическое определение доступных COM-портов
- Возможность сохранения и загрузки настроек
- Автоматическое обновление через GitHub
## Системные требования
- Windows 7/8/10/11
- Python 3.8 или выше
- Доступ к COM-портам
## Установка
1. Скачайте последнюю версию программы из [репозитория](https://gitea.filow.ru/LowaSC/ComConfigCopy/releases)
2. Распакуйте архив в удобное место
3. Запустите файл `ComConfigCopy.exe`
## Использование
1. Выберите COM-порт из списка доступных
2. Настройте параметры подключения (скорость, биты данных и т.д.)
3. Выберите файл конфигурации для отправки
4. Нажмите кнопку "Отправить" для начала процесса копирования
## Контакты
- Email: LowaWorkMail@gmail.com
- Telegram: [@LowaSC](https://t.me/LowaSC)
- Репозиторий: [ComConfigCopy](https://gitea.filow.ru/LowaSC/ComConfigCopy)
## Лицензия
Этот проект распространяется под лицензией MIT. Подробности смотрите в файле [LICENSE](LICENSE).

View File

@@ -1,98 +1,349 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Модуль для TFTP-сервера.
TFTP сервер для передачи прошивки с компьютера на коммутатор.
- Создает сервер по заданному IP и порту.
- Расшаривает папку Firmware.
- Показывает текущее состояние сервера и статус передачи файла:
- кому (IP устройства),
- сколько осталось байт,
- сколько передано байт,
- время передачи.
"""
import socket
import threading
import os
import socket
import struct
import threading
import time
class TFTPServerThread(threading.Thread):
def __init__(self, host, port, log_callback):
super().__init__()
self.host = host
self.port = port
self.log_callback = log_callback
self.running = True
self.sock = None
class TFTPServer:
def __init__(self, share_folder):
"""
Инициализация TFTP сервера.
def run(self):
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((self.host, self.port))
self.log_callback(f"TFTP сервер запущен на {self.host}:{self.port}\n")
except Exception as e:
self.log_callback(f"Ошибка запуска TFTP сервера: {e}\n")
return
while self.running:
try:
self.sock.settimeout(1)
data, addr = self.sock.recvfrom(1024)
if data:
self.handle_rrq(data, addr)
except socket.timeout:
continue
except Exception as e:
self.log_callback(f"Ошибка: {e}\n")
if self.sock:
self.sock.close()
self.log_callback("TFTP сервер остановлен.\n")
def handle_rrq(self, data, addr):
opcode = int.from_bytes(data[0:2], byteorder='big')
if opcode != 1:
self.log_callback(f"Получен не RRQ запрос от {addr}\n")
return
parts = data[2:].split(b'\0')
if len(parts) < 2:
self.log_callback(f"Неверный формат RRQ от {addr}\n")
return
req_filename = parts[0].decode('utf-8', errors='ignore')
mode = parts[1].decode('utf-8', errors='ignore')
self.log_callback(f"Получен RRQ для файла '{req_filename}' (режим {mode}) от {addr}\n")
filepath = os.path.join("Firmware", req_filename)
if not os.path.exists(filepath):
self.log_callback(f"Файл '{req_filename}' не найден в папке Firmware\n")
return
try:
filesize = os.path.getsize(filepath)
f = open(filepath, 'rb')
except Exception as e:
self.log_callback(f"Ошибка открытия файла '{req_filename}': {e}\n")
return
block_num = 1
bytes_sent = 0
start_time = time.time()
while True:
block_data = f.read(512)
data_packet = b'\x00\x03' + block_num.to_bytes(2, byteorder='big') + block_data
self.sock.sendto(data_packet, addr)
self.log_callback(f"Отправлен блок {block_num} ({len(block_data)} байт)\n")
try:
self.sock.settimeout(5)
ack, ack_addr = self.sock.recvfrom(1024)
if ack_addr != addr:
continue
ack_opcode = int.from_bytes(ack[0:2], byteorder='big')
ack_block = int.from_bytes(ack[2:4], byteorder='big')
if ack_opcode != 4 or ack_block != block_num:
self.log_callback(f"Неверный ACK от {addr}\n")
break
except socket.timeout:
self.log_callback("Таймаут ожидания ACK\n")
break
bytes_sent += len(block_data)
elapsed = time.time() - start_time
speed = bytes_sent / elapsed if elapsed > 0 else 0
progress = (bytes_sent / filesize) * 100 if filesize > 0 else 0
self.log_callback(f"Прогресс: {progress:.2f}% | Скорость: {speed:.2f} байт/с\n")
if len(block_data) < 512:
break
block_num += 1
f.close()
def stop(self):
:param share_folder: Путь к папке, содержащей файлы (например, папка 'Firmware')
"""
self.share_folder = share_folder
self.log_callback = None
self.running = False
self.server_socket = None
self.lock = threading.Lock()
self.transfer_sockets = set() # Множество для хранения всех активных сокетов передачи
# Словарь активных передач для мониторинга их статуса.
# Ключ адрес клиента, значение словарь с информацией о передаче.
self.active_transfers = {}
def set_log_callback(self, callback):
"""
Установка функции обратного вызова для логирования сообщений.
:param callback: Функция, принимающая строку сообщения.
"""
self.log_callback = callback
def log(self, message):
"""
Функция логирования: вызывает callback (если задан) или выводит сообщение в консоль.
:param message: Строка с сообщением для логирования.
"""
if self.log_callback:
self.log_callback(message)
else:
print(message)
def start_server(self, ip, port):
"""
Запуск TFTP сервера на указанном IP и порту.
:param ip: IP-адрес для привязки сервера.
:param port: Порт для TFTP сервера.
"""
if self.running:
self.log("[WARN] Сервер уже запущен")
return
self.running = True
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.server_socket.bind((ip, port))
self.log(f"[INFO] TFTP сервер запущен на {ip}:{port}")
while self.running:
try:
self.server_socket.settimeout(1.0)
data, client_addr = self.server_socket.recvfrom(2048)
if data and self.running:
threading.Thread(target=self.handle_request, args=(data, client_addr), daemon=True).start()
except socket.timeout:
continue
except socket.error as e:
if self.running: # Логируем ошибку только если сервер еще запущен
self.log(f"[ERROR] Ошибка получения данных: {str(e)}")
break
except Exception as e:
if self.running: # Логируем ошибку только если сервер еще запущен
self.log(f"[ERROR] Ошибка получения данных: {str(e)}")
break
except Exception as e:
self.log(f"[ERROR] Ошибка запуска сервера: {str(e)}")
finally:
self.running = False
if self.server_socket:
try:
self.server_socket.close()
except:
pass
self.server_socket = None
def stop_server(self):
"""
Остановка TFTP сервера.
"""
if not self.running:
return
self.log("[INFO] Остановка TFTP сервера...")
self.running = False
try:
# Закрываем основной сокет сервера первым
if self.server_socket:
try:
# Создаем временный сокет и отправляем пакет самому себе,
# чтобы разблокировать recvfrom
temp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
server_address = self.server_socket.getsockname()
temp_socket.sendto(b'', server_address)
except:
pass
finally:
try:
temp_socket.close()
except:
pass
try:
self.server_socket.close()
except Exception as e:
self.log(f"[WARN] Ошибка при закрытии основного сокета: {str(e)}")
finally:
self.server_socket = None
# Закрываем все активные сокеты передачи
with self.lock:
active_sockets = list(self.transfer_sockets)
self.transfer_sockets.clear()
active_transfers = dict(self.active_transfers)
self.active_transfers.clear()
# Закрываем сокеты передачи после очистки множества
for sock in active_sockets:
try:
if sock:
sock.close()
except Exception as e:
self.log(f"[WARN] Ошибка при закрытии сокета передачи: {str(e)}")
# Отправляем сообщения об остановке для активных передач
for client_addr, transfer_info in active_transfers.items():
try:
self.send_error(client_addr, 0, "Сервер остановлен")
except:
pass
except Exception as e:
self.log(f"[ERROR] Ошибка при остановке сервера: {str(e)}")
finally:
self.running = False # Гарантируем, что флаг running будет False
self.log("[INFO] TFTP сервер остановлен")
def handle_request(self, data, client_addr):
"""
Обработка входящего запроса от клиента.
:param data: Полученные данные (UDP-пакет).
:param client_addr: Адрес клиента, отправившего пакет.
"""
if len(data) < 2:
self.log(f"[WARN] Получен некорректный пакет от {client_addr}")
return
opcode = struct.unpack("!H", data[:2])[0]
if opcode == 1: # RRQ (Read Request) запрос на чтение файла
self.handle_rrq(data, client_addr)
else:
self.log(f"[WARN] Неподдерживаемый запрос (опкод {opcode}) от {client_addr}")
def handle_rrq(self, data, client_addr):
"""
Обработка запроса на чтение файла (RRQ).
:param data: Данные запроса.
:param client_addr: Адрес клиента.
"""
try:
# RRQ формата: 2 байта опкода, затем строка имени файла, за которой следует 0,
# затем строка режима (например, "octet"), и завершается 0.
parts = data[2:].split(b'\0')
if len(parts) < 2:
self.log(f"[WARN] Некорректный RRQ пакет от {client_addr}")
return
filename = parts[0].decode('utf-8')
mode = parts[1].decode('utf-8').lower()
self.log(f"[INFO] Получен RRQ от {client_addr}: файл '{filename}', режим '{mode}'")
if mode != "octet":
self.send_error(client_addr, 0, "Поддерживается только octet режим")
return
file_path = os.path.join(self.share_folder, filename)
if not os.path.isfile(file_path):
self.send_error(client_addr, 1, "Файл не найден")
return
# Запускаем передачу файла в новом потоке.
threading.Thread(target=self.send_file, args=(file_path, client_addr), daemon=True).start()
except Exception as e:
self.log(f"[ERROR] Ошибка обработки RRQ: {str(e)}")
def send_error(self, client_addr, error_code, error_message):
"""
Отправка сообщения об ошибке клиенту.
:param client_addr: Адрес клиента.
:param error_code: Код ошибки.
:param error_message: Текст ошибки.
"""
# Формируем TFTP пакет ошибки: 2 байта опкода (5), 2 байта кода ошибки, сообщение об ошибке и завершающий 0.
packet = struct.pack("!HH", 5, error_code) + error_message.encode('utf-8') + b'\0'
self.server_socket.sendto(packet, client_addr)
self.log(f"[INFO] Отправлено сообщение об ошибке '{error_message}' клиенту {client_addr}")
def send_file(self, file_path, client_addr):
"""
Передача файла клиенту по протоколу TFTP.
"""
BLOCK_SIZE = 512
MAX_RETRIES = 5
TIMEOUT = 2.0
transfer_socket = None
try:
if not os.path.exists(file_path):
self.log(f"[ERROR] Файл '{file_path}' не существует")
self.send_error(client_addr, 1, "Файл не найден")
return
filesize = os.path.getsize(file_path)
if filesize == 0:
self.log(f"[ERROR] Файл '{file_path}' пуст")
self.send_error(client_addr, 0, "Файл пуст")
return
start_time = time.time()
file_basename = os.path.basename(file_path)
# Регистрируем активную передачу
with self.lock:
self.active_transfers[client_addr] = {
'filename': file_basename,
'filesize': filesize,
'bytes_sent': 0,
'start_time': start_time
}
# Создаем новый сокет для передачи данных
transfer_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
transfer_socket.settimeout(TIMEOUT)
with self.lock:
self.transfer_sockets.add(transfer_socket)
self.log(f"[INFO] Начало передачи файла '{file_basename}' клиенту {client_addr}. Размер файла: {filesize} байт.")
with open(file_path, 'rb') as file:
block_number = 1
last_successful_block = 0
while True:
# Читаем блок данных
data = file.read(BLOCK_SIZE)
# Формируем и отправляем пакет данных
packet = struct.pack('!HH', 3, block_number) + data
retries = 0
while retries < MAX_RETRIES:
try:
transfer_socket.sendto(packet, client_addr)
# Ожидаем подтверждение
while True:
try:
ack_data, ack_addr = transfer_socket.recvfrom(4)
if ack_addr == client_addr and len(ack_data) >= 4:
opcode, ack_block = struct.unpack('!HH', ack_data)
if opcode == 4: # ACK
if ack_block == block_number:
# Успешное подтверждение
last_successful_block = block_number
bytes_sent = min((block_number * BLOCK_SIZE), filesize)
# Обновляем информацию о прогрессе
with self.lock:
if client_addr in self.active_transfers:
self.active_transfers[client_addr]['bytes_sent'] = bytes_sent
# Логируем статус каждую секунду
current_time = time.time()
if current_time - start_time >= 1.0:
bytes_remaining = filesize - bytes_sent
elapsed_time = current_time - start_time
self.log(f"[STATUS] Клиент: {client_addr} | Файл: {file_basename} | "
f"Отправлено: {bytes_sent}/{filesize} байт | "
f"Осталось: {bytes_remaining} байт | "
f"Время: {elapsed_time:.2f} сек.")
break
elif ack_block < block_number:
# Получен старый ACK, игнорируем
continue
except socket.timeout:
break
if last_successful_block == block_number:
break
else:
retries += 1
self.log(f"[WARN] Таймаут ожидания ACK для блока {block_number} от {client_addr}. "
f"Попытка {retries + 1}.")
except Exception as e:
retries += 1
self.log(f"[ERROR] Ошибка при передаче блока {block_number}: {str(e)}")
if retries >= MAX_RETRIES:
self.log(f"[ERROR] Превышено максимальное количество попыток для блока {block_number}")
return
block_number += 1
# Если отправили меньше BLOCK_SIZE байт, это был последний блок
if len(data) < BLOCK_SIZE:
break
self.log(f"[INFO] Передача файла '{file_basename}' клиенту {client_addr} завершена успешно")
except Exception as e:
self.log(f"[ERROR] Ошибка при передаче файла: {str(e)}")
finally:
# Очищаем информацию о передаче
with self.lock:
if client_addr in self.active_transfers:
del self.active_transfers[client_addr]
if transfer_socket in self.transfer_sockets:
self.transfer_sockets.remove(transfer_socket)
if transfer_socket:
try:
transfer_socket.close()
except:
pass

103
about_window.py Normal file
View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import tkinter as tk
from tkinter import ttk, BOTH, X, BOTTOM, END
import webbrowser
class AboutWindow(tk.Toplevel):
def __init__(self, parent):
super().__init__(parent)
self.title("О программе")
self.geometry("600x500")
self.resizable(False, False)
# Сохраняем ссылку на родительское окно
self.parent = parent
# Создаем фрейм для содержимого
about_frame = ttk.Frame(self, padding="20")
about_frame.pack(fill=BOTH, expand=True, padx=10, pady=10)
# Заголовок
ttk.Label(
about_frame,
text="ComConfigCopy",
font=("Segoe UI", 16, "bold")
).pack(pady=(0, 10))
# Описание
ttk.Label(
about_frame,
text="Программа для копирования конфигураций на коммутаторы",
wraplength=350
).pack(pady=(0, 20))
# Версия
ttk.Label(
about_frame,
text=f"Версия {getattr(parent, 'VERSION', '1.0.0')}",
font=("Segoe UI", 10)
).pack(pady=(0, 20))
# Контактная информация
contact_frame = ttk.Frame(about_frame)
contact_frame.pack(fill=X, pady=(0, 20))
# Исходный код
ttk.Label(
contact_frame,
text="Исходный код:",
font=("Segoe UI", 10, "bold")
).pack(anchor="w")
source_link = ttk.Label(
contact_frame,
text="https://gitea.filow.ru/LowaSC/ComConfigCopy",
cursor="hand2",
foreground="blue"
)
source_link.pack(anchor="w")
source_link.bind("<Button-1>", lambda e: self.open_url("https://gitea.filow.ru/LowaSC/ComConfigCopy"))
# Контакты
ttk.Label(
contact_frame,
text="\nКонтакты:",
font=("Segoe UI", 10, "bold")
).pack(anchor="w")
ttk.Label(
contact_frame,
text="Email: LowaWorkMail@gmail.com"
).pack(anchor="w")
telegram_link = ttk.Label(
contact_frame,
text="Telegram: @LowaSC",
cursor="hand2",
foreground="blue"
)
telegram_link.pack(anchor="w")
telegram_link.bind("<Button-1>", lambda e: self.open_url("https://t.me/LowaSC"))
# Кнопка закрытия
ttk.Button(
self,
text="Закрыть",
command=self.destroy
).pack(side=BOTTOM, pady=10)
# Центрируем окно
self.center_window()
def center_window(self):
self.update_idletasks()
width = self.winfo_width()
height = self.winfo_height()
x = (self.winfo_screenwidth() // 2) - (width // 2)
y = (self.winfo_screenheight() // 2) - (height // 2)
self.geometry(f"{width}x{height}+{x}+{y}")
def open_url(self, url):
webbrowser.open(url)

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
tftpy>=0.8.0
pyserial>=3.5
requests>=2.31.0
packaging>=23.2

175
update_checker.py Normal file
View File

@@ -0,0 +1,175 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import requests
import threading
from packaging import version
class UpdateCheckError(Exception):
"""Исключение для ошибок проверки обновлений"""
pass
class UpdateChecker:
"""Класс для проверки обновлений программы"""
def __init__(self, current_version, repo_url):
self.current_version = current_version
self.repo_url = repo_url
# Формируем базовый URL API
self.api_url = repo_url.replace("gitea.filow.ru", "gitea.filow.ru/api/v1/repos/LowaSC/ComConfigCopy")
self._update_available = False
self._latest_version = None
self._latest_release = None
self._error = None
self._changelog = None
def get_changelog(self, callback=None):
"""
Получение changelog из репозитория.
:param callback: Функция обратного вызова, которая будет вызвана после получения changelog
"""
def fetch():
try:
# Пытаемся получить CHANGELOG.md из репозитория
response = requests.get(f"{self.api_url}/contents/CHANGELOG.md", timeout=10)
response.raise_for_status()
content = response.json()
if "content" in content:
import base64
changelog_content = base64.b64decode(content["content"]).decode("utf-8")
self._changelog = changelog_content
self._error = None
else:
raise UpdateCheckError("Не удалось получить содержимое CHANGELOG.md")
except requests.RequestException as e:
error_msg = f"Ошибка получения changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
except Exception as e:
error_msg = f"Неизвестная ошибка при получении changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
finally:
if callback:
callback(self._changelog, self._error)
# Запускаем получение в отдельном потоке
threading.Thread(target=fetch, daemon=True).start()
def check_updates(self, callback=None):
"""
Проверка наличия обновлений.
:param callback: Функция обратного вызова, которая будет вызвана после проверки
"""
def check():
try:
response = requests.get(f"{self.api_url}/releases", timeout=10)
response.raise_for_status()
releases = response.json()
if not releases:
raise UpdateCheckError("Не найдено релизов в репозитории")
latest_release = releases[0]
latest_version = latest_release.get("tag_name", "").lstrip("v")
if not latest_version:
raise UpdateCheckError("Не удалось определить версию последнего релиза")
try:
if version.parse(latest_version) > version.parse(self.current_version):
self._update_available = True
self._latest_version = latest_version
self._latest_release = latest_release
logging.info(f"Доступно обновление: {latest_version}")
else:
logging.info("Обновления не требуются")
except version.InvalidVersion as e:
raise UpdateCheckError(f"Некорректный формат версии: {e}")
self._error = None
except requests.RequestException as e:
error_msg = f"Ошибка сетевого подключения: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
except UpdateCheckError as e:
logging.error(str(e), exc_info=True)
self._error = str(e)
except Exception as e:
error_msg = f"Неизвестная ошибка при проверке обновлений: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
finally:
if callback:
callback(self._update_available, self._error)
@property
def update_available(self):
"""Доступно ли обновление"""
return self._update_available
@property
def latest_version(self):
"""Последняя доступная версия"""
return self._latest_version
@property
def error(self):
"""Последняя ошибка при проверке обновлений"""
return self._error
@property
def changelog(self):
"""Текущий changelog"""
return self._changelog
def get_release_notes(self):
"""Получение информации о последнем релизе"""
if self._latest_release:
return {
"version": self._latest_version,
"description": self._latest_release.get("body", ""),
"download_url": self._latest_release.get("assets", [{}])[0].get("browser_download_url", "")
}
return None
def get_releases(self, callback=None):
"""
Получение списка релизов из репозитория.
:param callback: Функция обратного вызова, которая будет вызвана после получения списка релизов
"""
def fetch():
try:
response = requests.get(f"{self.api_url}/releases", timeout=10)
response.raise_for_status()
releases = response.json()
if not releases:
raise UpdateCheckError("Не найдено релизов в репозитории")
self._error = None
if callback:
callback(releases, None)
except requests.RequestException as e:
error_msg = f"Ошибка сетевого подключения: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
except Exception as e:
error_msg = f"Ошибка при получении списка релизов: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
# Запускаем получение в отдельном потоке
threading.Thread(target=fetch, daemon=True).start()