Enhance update checking

- Refactor UpdateChecker
- Add support for parsing release details with improved formatting
- Implement more robust version comparison and release type handling
- Add logging for update checking process
- Improve error handling and release information extraction
- Update update checking logic to handle stable and pre-release versions
This commit is contained in:
2025-02-17 00:53:24 +03:00
parent f84e20631b
commit 1a511ff54f
3 changed files with 167 additions and 157 deletions

View File

@@ -524,15 +524,16 @@ class SettingsWindow(tk.Toplevel):
copy_mode_frame = ttk.Frame(settings_frame) copy_mode_frame = ttk.Frame(settings_frame)
copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5) copy_mode_frame.grid(row=3, column=1, sticky=W, pady=5)
ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line", ttk.Radiobutton(copy_mode_frame, text="Построчно", value="line",
variable=self.copy_mode_var).pack(side=LEFT) variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block", ttk.Radiobutton(copy_mode_frame, text="Блоками", value="block",
variable=self.copy_mode_var).pack(side=LEFT) variable=self.copy_mode_var, command=self.toggle_block_size).pack(side=LEFT)
# Размер блока # Размер блока
ttk.Label(settings_frame, text="Размер блока:").grid(row=4, column=0, sticky=W, pady=5) self.block_size_label = ttk.Label(settings_frame, text="Размер блока:")
self.block_size_label.grid(row=4, column=0, sticky=W, pady=5)
self.block_size_var = StringVar(value=str(settings.get("block_size", 15))) self.block_size_var = StringVar(value=str(settings.get("block_size", 15)))
block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var) self.block_size_entry = CustomEntry(settings_frame, textvariable=self.block_size_var)
block_size_entry.grid(row=4, column=1, sticky=W, pady=5) self.block_size_entry.grid(row=4, column=1, sticky=W, pady=5)
# Приглашение командной строки # Приглашение командной строки
ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5) ttk.Label(settings_frame, text="Приглашение:").grid(row=5, column=0, sticky=W, pady=5)
@@ -548,9 +549,21 @@ class SettingsWindow(tk.Toplevel):
self.update_ports() self.update_ports()
# Инициализация видимости поля размера блока
self.toggle_block_size()
# Центрируем окно # Центрируем окно
self.center_window() self.center_window()
# Переключение видимости поля размера блока
def toggle_block_size(self):
if self.copy_mode_var.get() == "line":
self.block_size_label.grid_remove()
self.block_size_entry.grid_remove()
else:
self.block_size_label.grid()
self.block_size_entry.grid()
# Центрирование окна # Центрирование окна
def center_window(self): def center_window(self):
self.update_idletasks() self.update_idletasks()
@@ -670,7 +683,8 @@ class SerialAppGUI(tk.Tk):
# Инициализация проверки обновлений # Инициализация проверки обновлений
self.update_checker = UpdateChecker( self.update_checker = UpdateChecker(
VERSION, VERSION,
"https://gitea.filow.ru/LowaSC/ComConfigCopy" "https://gitea.filow.ru/LowaSC/ComConfigCopy",
include_prereleases=False # Проверяем только стабильные версии
) )
# Настройка стиля # Настройка стиля
@@ -719,14 +733,20 @@ class SerialAppGUI(tk.Tk):
elif update_available: elif update_available:
release_info = self.update_checker.get_release_notes() release_info = self.update_checker.get_release_notes()
if release_info: if release_info:
# Форматируем сообщение
message = (
f"Доступна новая версия {release_info['version']}!\n\n"
f"Тип релиза: {'Пре-релиз' if release_info['type'] == 'prerelease' else 'Стабильный'}\n\n"
"Изменения:\n"
f"{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?"
)
response = messagebox.askyesno( response = messagebox.askyesno(
"Доступно обновление", "Доступно обновление",
f"Доступна новая версия {release_info['version']}!\n\n" message,
f"Изменения:\n{release_info['description']}\n\n"
"Хотите перейти на страницу загрузки?",
) )
if response: if response:
webbrowser.open(release_info["download_url"]) webbrowser.open(release_info["link"])
else: else:
messagebox.showerror( messagebox.showerror(
"Ошибка", "Ошибка",

View File

@@ -69,7 +69,7 @@ class AboutWindow(tk.Toplevel):
ttk.Label( ttk.Label(
contact_frame, contact_frame,
text="Email: LowaWorkMail@gmail.com" text="Email: SPRF555@gmail.com"
).pack(anchor="w") ).pack(anchor="w")
telegram_link = ttk.Label( telegram_link = ttk.Label(

View File

@@ -1,175 +1,165 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json
import logging import logging
import requests import requests
import threading import threading
import re
from packaging import version from packaging import version
import xml.etree.ElementTree as ET
import html
class UpdateCheckError(Exception): class UpdateCheckError(Exception):
"""Исключение для ошибок проверки обновлений""" """Исключение для ошибок проверки обновлений"""
pass pass
class ReleaseType:
"""Типы релизов"""
STABLE = "stable"
PRERELEASE = "prerelease"
class UpdateChecker: class UpdateChecker:
"""Класс для проверки обновлений программы""" """Класс для проверки обновлений программы"""
def __init__(self, current_version, repo_url): def __init__(self, current_version, repo_url, include_prereleases=False):
self.current_version = current_version self.current_version = current_version
self.repo_url = repo_url self.repo_url = repo_url
# Формируем базовый URL API self.include_prereleases = include_prereleases
self.api_url = repo_url.replace("gitea.filow.ru", "gitea.filow.ru/api/v1/repos/LowaSC/ComConfigCopy") self.rss_url = f"{repo_url}/releases.rss"
self._update_available = False self.release_info = None
self._latest_version = None
self._latest_release = None def _clean_html(self, html_text):
self._error = None """Очищает HTML-разметку и форматирует текст"""
self._changelog = None if not html_text:
return ""
def get_changelog(self, callback=None): text = re.sub(r'<[^>]+>', '', html_text)
""" text = html.unescape(text)
Получение changelog из репозитория. text = re.sub(r'\n\s*\n', '\n\n', text)
:param callback: Функция обратного вызова, которая будет вызвана после получения changelog return '\n'.join(line.strip() for line in text.splitlines()).strip()
"""
def fetch(): def _parse_release_info(self, item):
try: """Извлекает информацию о релизе из RSS item"""
# Пытаемся получить CHANGELOG.md из репозитория title = item.find('title').text if item.find('title') is not None else ''
response = requests.get(f"{self.api_url}/contents/CHANGELOG.md", timeout=10) link = item.find('link').text if item.find('link') is not None else ''
response.raise_for_status() description = item.find('description').text if item.find('description') is not None else ''
content = item.find('{http://purl.org/rss/1.0/modules/content/}encoded')
content = response.json() content_text = content.text if content is not None else ''
if "content" in content:
import base64
changelog_content = base64.b64decode(content["content"]).decode("utf-8")
self._changelog = changelog_content
self._error = None
else:
raise UpdateCheckError("Не удалось получить содержимое CHANGELOG.md")
except requests.RequestException as e:
error_msg = f"Ошибка получения changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
except Exception as e:
error_msg = f"Неизвестная ошибка при получении changelog: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
self._changelog = None
finally:
if callback:
callback(self._changelog, self._error)
# Запускаем получение в отдельном потоке # Извлекаем версию и проверяем тип релиза из тега
threading.Thread(target=fetch, daemon=True).start() version_match = re.search(r'/releases/tag/(?:pre-)?v?(\d+\.\d+(?:\.\d+)?)', link)
if not version_match:
return None
version_str = version_match.group(1)
# Проверяем наличие префикса pre- в теге
is_prerelease = 'pre-' in link.lower()
# Форматируем название релиза
formatted_title = title
if title == version_str or not title.strip():
# Если заголовок пустой или совпадает с версией, создаем стандартное название
release_type = "Пре-релиз" if is_prerelease else "Версия"
formatted_title = f"{release_type} {version_str}"
elif not re.search(version_str, title):
# Если версия не указана в заголовке, добавляем её
formatted_title = f"{title} ({version_str})"
# Форматируем описание
formatted_description = self._clean_html(content_text or description)
if not formatted_description.strip():
formatted_description = "Нет описания"
# Добавляем метку типа релиза в начало описания
release_type_label = "[Пре-релиз] " if is_prerelease else ""
formatted_description = f"{release_type_label}{formatted_description}"
return {
'title': formatted_title,
'link': link,
'description': formatted_description,
'version': version_str,
'type': ReleaseType.PRERELEASE if is_prerelease else ReleaseType.STABLE
}
def check_updates(self, callback=None): def check_updates(self, callback=None):
""" """Проверяет наличие обновлений в асинхронном режиме"""
Проверка наличия обновлений. def check_worker():
:param callback: Функция обратного вызова, которая будет вызвана после проверки
"""
def check():
try: try:
response = requests.get(f"{self.api_url}/releases", timeout=10) logging.info(f"Текущая версия программы: {self.current_version}")
logging.info(f"Проверка пре-релизов: {self.include_prereleases}")
logging.info(f"Запрос RSS ленты: {self.rss_url}")
response = requests.get(self.rss_url, timeout=10)
response.raise_for_status() response.raise_for_status()
releases = response.json() root = ET.fromstring(response.content)
if not releases: items = root.findall('.//item')
raise UpdateCheckError("Не найдено релизов в репозитории") if not items:
raise UpdateCheckError("Релизы не найдены")
latest_release = releases[0] logging.info(f"Найдено {len(items)} релизов")
latest_version = latest_release.get("tag_name", "").lstrip("v")
if not latest_version: latest_version = None
raise UpdateCheckError("Не удалось определить версию последнего релиза") latest_info = None
try: for item in items:
if version.parse(latest_version) > version.parse(self.current_version): release_info = self._parse_release_info(item)
self._update_available = True if not release_info:
self._latest_version = latest_version continue
self._latest_release = latest_release
logging.info(f"Доступно обновление: {latest_version}") is_prerelease = release_info['type'] == ReleaseType.PRERELEASE
else: logging.info(
logging.info("Обновления не требуются") f"Проверка релиза: {release_info['title']}, "
except version.InvalidVersion as e: f"версия: {release_info['version']}, "
raise UpdateCheckError(f"Некорректный формат версии: {e}") f"тип: {'пре-релиз' if is_prerelease else 'стабильная'}"
)
self._error = None # Пропускаем пре-релизы если они не включены
if is_prerelease and not self.include_prereleases:
logging.info(f"Пропуск пре-релиза: {release_info['version']}")
continue
except requests.RequestException as e: # Сравниваем версии
error_msg = f"Ошибка сетевого подключения: {e}" try:
logging.error(error_msg, exc_info=True) current_ver = version.parse(latest_version or "0.0.0")
self._error = error_msg new_ver = version.parse(release_info['version'].split('-')[0]) # Убираем суффикс для сравнения
except UpdateCheckError as e:
logging.error(str(e), exc_info=True) if new_ver > current_ver:
self._error = str(e) latest_version = release_info['version']
except Exception as e: latest_info = release_info
error_msg = f"Неизвестная ошибка при проверке обновлений: {e}" logging.info(f"Новая версия: {latest_version}")
logging.error(error_msg, exc_info=True)
self._error = error_msg except version.InvalidVersion as e:
finally: logging.warning(f"Некорректный формат версии {release_info['version']}: {e}")
continue
if not latest_info:
raise UpdateCheckError("Не найдены подходящие версии")
self.release_info = latest_info
# Сравниваем с текущей версией
current_ver = version.parse(self.current_version)
latest_ver = version.parse(latest_version.split('-')[0])
update_available = latest_ver > current_ver
logging.info(f"Сравнение версий: текущая {current_ver} <-> последняя {latest_ver}")
logging.info(f"Доступно обновление: {update_available}")
if callback: if callback:
callback(self._update_available, self._error) callback(update_available, None)
@property except UpdateCheckError as e:
def update_available(self): logging.error(str(e))
"""Доступно ли обновление""" if callback:
return self._update_available callback(False, str(e))
except Exception as e:
@property logging.error(f"Ошибка при проверке обновлений: {e}", exc_info=True)
def latest_version(self): if callback:
"""Последняя доступная версия""" callback(False, str(e))
return self._latest_version
threading.Thread(target=check_worker, daemon=True).start()
@property
def error(self):
"""Последняя ошибка при проверке обновлений"""
return self._error
@property
def changelog(self):
"""Текущий changelog"""
return self._changelog
def get_release_notes(self): def get_release_notes(self):
"""Получение информации о последнем релизе""" """Возвращает информацию о последнем релизе"""
if self._latest_release: return self.release_info
return {
"version": self._latest_version,
"description": self._latest_release.get("body", ""),
"download_url": self._latest_release.get("assets", [{}])[0].get("browser_download_url", "")
}
return None
def get_releases(self, callback=None):
"""
Получение списка релизов из репозитория.
:param callback: Функция обратного вызова, которая будет вызвана после получения списка релизов
"""
def fetch():
try:
response = requests.get(f"{self.api_url}/releases", timeout=10)
response.raise_for_status()
releases = response.json()
if not releases:
raise UpdateCheckError("Не найдено релизов в репозитории")
self._error = None
if callback:
callback(releases, None)
except requests.RequestException as e:
error_msg = f"Ошибка сетевого подключения: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
except Exception as e:
error_msg = f"Ошибка при получении списка релизов: {e}"
logging.error(error_msg, exc_info=True)
self._error = error_msg
if callback:
callback(None, error_msg)
# Запускаем получение в отдельном потоке
threading.Thread(target=fetch, daemon=True).start()