Remove hardcoded libpython binaries and add debug step
All checks were successful
build / build-linux (push) Successful in 16s

This commit is contained in:
kdusek
2025-12-07 23:15:18 +01:00
parent 308ce7768e
commit 6a1fe63684
1807 changed files with 172293 additions and 1 deletions

View File

@@ -0,0 +1,36 @@
# -----------------------------------------------------------------------------
# Copyright (c) 2023, PyInstaller Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#
# SPDX-License-Identifier: Apache-2.0
# -----------------------------------------------------------------------------
import sys
import os
# A boolean indicating whether the frozen application is a macOS .app bundle.
is_macos_app_bundle = sys.platform == 'darwin' and sys._MEIPASS.endswith("Contents/Frameworks")
def prepend_path_to_environment_variable(path, variable_name):
"""
Prepend the given path to the list of paths stored in the given environment variable (separated by `os.pathsep`).
If the given path is already specified in the environment variable, no changes are made. If the environment variable
is not set or is empty, it is set/overwritten with the given path.
"""
stored_paths = os.environ.get(variable_name)
if stored_paths:
# If path is already included, make this a no-op. NOTE: we need to split the string and search in the list of
# substrings to find an exact match; searching in the original string might erroneously match a prefix of a
# longer (i.e., sub-directory) path when such entry already happens to be in PATH (see #8857).
if path in stored_paths.split(os.pathsep):
return
# Otherwise, prepend the path
stored_paths = path + os.pathsep + stored_paths
else:
stored_paths = path
os.environ[variable_name] = stored_paths

View File

@@ -0,0 +1,333 @@
# -----------------------------------------------------------------------------
# Copyright (c) 2023, PyInstaller Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#
# SPDX-License-Identifier: Apache-2.0
# -----------------------------------------------------------------------------
import ctypes
import ctypes.wintypes
# Constants from win32 headers
TOKEN_QUERY = 0x0008
TokenUser = 1 # from TOKEN_INFORMATION_CLASS enum
TokenAppContainerSid = 31 # from TOKEN_INFORMATION_CLASS enum
ERROR_INSUFFICIENT_BUFFER = 122
INVALID_HANDLE = -1
FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
SDDL_REVISION1 = 1
# Structures for ConvertSidToStringSidW
PSID = ctypes.wintypes.LPVOID
class SID_AND_ATTRIBUTES(ctypes.Structure):
_fields_ = [
("Sid", PSID),
("Attributes", ctypes.wintypes.DWORD),
]
class TOKEN_USER(ctypes.Structure):
_fields_ = [
("User", SID_AND_ATTRIBUTES),
]
PTOKEN_USER = ctypes.POINTER(TOKEN_USER)
class TOKEN_APPCONTAINER_INFORMATION(ctypes.Structure):
_fields_ = [
("TokenAppContainer", PSID),
]
PTOKEN_APPCONTAINER_INFORMATION = ctypes.POINTER(TOKEN_APPCONTAINER_INFORMATION)
# SECURITY_ATTRIBUTES structure for CreateDirectoryW
PSECURITY_DESCRIPTOR = ctypes.wintypes.LPVOID
class SECURITY_ATTRIBUTES(ctypes.Structure):
_fields_ = [
("nLength", ctypes.wintypes.DWORD),
("lpSecurityDescriptor", PSECURITY_DESCRIPTOR),
("bInheritHandle", ctypes.wintypes.BOOL),
]
# win32 API functions, bound via ctypes.
# NOTE: we do not use ctypes.windll.<dll_name> to avoid modifying its (global) function prototypes, which might affect
# user's code.
advapi32 = ctypes.WinDLL("advapi32")
kernel32 = ctypes.WinDLL("kernel32")
advapi32.ConvertSidToStringSidW.restype = ctypes.wintypes.BOOL
advapi32.ConvertSidToStringSidW.argtypes = (
PSID, # [in] PSID Sid
ctypes.POINTER(ctypes.wintypes.LPWSTR), # [out] LPWSTR *StringSid
)
advapi32.ConvertStringSecurityDescriptorToSecurityDescriptorW.restype = ctypes.wintypes.BOOL
advapi32.ConvertStringSecurityDescriptorToSecurityDescriptorW.argtypes = (
ctypes.wintypes.LPCWSTR, # [in] LPCWSTR StringSecurityDescriptor
ctypes.wintypes.DWORD, # [in] DWORD StringSDRevision
ctypes.POINTER(PSECURITY_DESCRIPTOR), # [out] PSECURITY_DESCRIPTOR *SecurityDescriptor
ctypes.wintypes.PULONG, # [out] PULONG SecurityDescriptorSize
)
advapi32.GetTokenInformation.restype = ctypes.wintypes.BOOL
advapi32.GetTokenInformation.argtypes = (
ctypes.wintypes.HANDLE, # [in] HANDLE TokenHandle
ctypes.c_int, # [in] TOKEN_INFORMATION_CLASS TokenInformationClass
ctypes.wintypes.LPVOID, # [out, optional] LPVOID TokenInformation
ctypes.wintypes.DWORD, # [in] DWORD TokenInformationLength
ctypes.wintypes.PDWORD, # [out] PDWORD ReturnLength
)
kernel32.CloseHandle.restype = ctypes.wintypes.BOOL
kernel32.CloseHandle.argtypes = (
ctypes.wintypes.HANDLE, # [in] HANDLE hObject
)
kernel32.CreateDirectoryW.restype = ctypes.wintypes.BOOL
kernel32.CreateDirectoryW.argtypes = (
ctypes.wintypes.LPCWSTR, # [in] LPCWSTR lpPathName
ctypes.POINTER(SECURITY_ATTRIBUTES), # [in, optional] LPSECURITY_ATTRIBUTES lpSecurityAttributes
)
kernel32.FormatMessageW.restype = ctypes.wintypes.DWORD
kernel32.FormatMessageW.argtypes = (
ctypes.wintypes.DWORD, # [in] DWORD dwFlags
ctypes.wintypes.LPCVOID, # [in, optional] LPCVOID lpSource
ctypes.wintypes.DWORD, # [in] DWORD dwMessageId
ctypes.wintypes.DWORD, # [in] DWORD dwLanguageId
ctypes.wintypes.LPWSTR, # [out] LPWSTR lpBuffer
ctypes.wintypes.DWORD, # [in] DWORD nSize
ctypes.wintypes.LPVOID, # [in, optional] va_list *Arguments
)
kernel32.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
# kernel32.GetCurrentProcess has no arguments
kernel32.GetLastError.restype = ctypes.wintypes.DWORD
# kernel32.GetLastError has no arguments
kernel32.LocalFree.restype = ctypes.wintypes.BOOL
kernel32.LocalFree.argtypes = (
ctypes.wintypes.HLOCAL, # [in] _Frees_ptr_opt_ HLOCAL hMem
)
kernel32.OpenProcessToken.restype = ctypes.wintypes.BOOL
kernel32.OpenProcessToken.argtypes = (
ctypes.wintypes.HANDLE, # [in] HANDLE ProcessHandle
ctypes.wintypes.DWORD, # [in] DWORD DesiredAccess
ctypes.wintypes.PHANDLE, # [out] PHANDLE TokenHandle
)
def _win_error_to_message(error_code):
"""
Convert win32 error code to message.
"""
message_wstr = ctypes.wintypes.LPWSTR(None)
ret = kernel32.FormatMessageW(
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
None, # lpSource
error_code, # dwMessageId
0x400, # dwLanguageId = MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT)
ctypes.cast(
ctypes.byref(message_wstr),
ctypes.wintypes.LPWSTR,
), # pointer to LPWSTR due to FORMAT_MESSAGE_ALLOCATE_BUFFER; needs to be cast to LPWSTR
64, # due to FORMAT_MESSAGE_ALLOCATE_BUFFER, this is minimum number of characters to allocate
None,
)
if ret == 0:
return None
message = message_wstr.value
kernel32.LocalFree(message_wstr)
# Strip trailing CR/LF.
if message:
message = message.strip()
return message
def _get_process_sid(token_information_class):
"""
Obtain the SID from the current process by the given token information class.
Args:
token_information_class: Token information class identifying the SID that we're
interested in. Only TokenUser and TokenAppContainerSid are supported.
Returns: SID (if it could be fetched) or None if not available or on error.
"""
process_token = ctypes.wintypes.HANDLE(INVALID_HANDLE)
try:
# Get access token for the current process
ret = kernel32.OpenProcessToken(
kernel32.GetCurrentProcess(),
TOKEN_QUERY,
ctypes.pointer(process_token),
)
if ret == 0:
error_code = kernel32.GetLastError()
raise RuntimeError(f"Failed to open process token! Error code: 0x{error_code:X}")
# Query buffer size for sid
token_info_size = ctypes.wintypes.DWORD(0)
ret = advapi32.GetTokenInformation(
process_token,
token_information_class,
None,
0,
ctypes.byref(token_info_size),
)
# We expect this call to fail with ERROR_INSUFFICIENT_BUFFER
if ret == 0:
error_code = kernel32.GetLastError()
if error_code != ERROR_INSUFFICIENT_BUFFER:
raise RuntimeError(f"Failed to query token information buffer size! Error code: 0x{error_code:X}")
else:
raise RuntimeError("Unexpected return value from GetTokenInformation!")
# Allocate buffer
token_info = ctypes.create_string_buffer(token_info_size.value)
ret = advapi32.GetTokenInformation(
process_token,
token_information_class,
token_info,
token_info_size,
ctypes.byref(token_info_size),
)
if ret == 0:
error_code = kernel32.GetLastError()
raise RuntimeError(f"Failed to query token information! Error code: 0x{error_code:X}")
# Convert SID to string
# Technically, when UserToken is used, we need to pass user_info->User.Sid,
# but as they are at the beginning of the buffer, just pass the buffer instead...
sid_wstr = ctypes.wintypes.LPWSTR(None)
if token_information_class == TokenUser:
sid = ctypes.cast(token_info, PTOKEN_USER).contents.User.Sid
elif token_information_class == TokenAppContainerSid:
sid = ctypes.cast(token_info, PTOKEN_APPCONTAINER_INFORMATION).contents.TokenAppContainer
else:
raise ValueError(f"Unexpected token information class: {token_information_class}")
ret = advapi32.ConvertSidToStringSidW(sid, ctypes.pointer(sid_wstr))
if ret == 0:
error_code = kernel32.GetLastError()
raise RuntimeError(f"Failed to convert SID to string! Error code: 0x{error_code:X}")
sid = sid_wstr.value
kernel32.LocalFree(sid_wstr)
except Exception:
sid = None
finally:
# Close the process token
if process_token.value != INVALID_HANDLE:
kernel32.CloseHandle(process_token)
return sid
# Get and cache current user's SID
_user_sid = _get_process_sid(TokenUser)
# Get and cache current app container's SID (if any)
_app_container_sid = _get_process_sid(TokenAppContainerSid)
def secure_mkdir(dir_name):
"""
Replacement for mkdir that limits the access to created directory to current user.
"""
# Create security descriptor
# Prefer actual user SID over SID S-1-3-4 (current owner), because at the time of writing, Wine does not properly
# support the latter.
user_sid = _user_sid or "S-1-3-4"
# DACL descriptor (D):
# ace_type;ace_flags;rights;object_guid;inherit_object_guid;account_sid;(resource_attribute)
# - ace_type = SDDL_ACCESS_ALLOWED (A)
# - rights = SDDL_FILE_ALL (FA)
# - account_sid = current user (queried SID)
security_desc_str = f"D:(A;;FA;;;{user_sid})"
# If the app is running within an AppContainer, the app container SID has to be added to the DACL.
# Otherwise our process will not have access to the temp dir.
#
# Quoting https://learn.microsoft.com/en-us/windows/win32/secauthz/implementing-an-appcontainer:
# "The AppContainer SID is a persistent unique identifier for the appcontainer. ...
# To allow a single AppContainer to access a resource, add its AppContainerSID to the ACL for that resource."
if _app_container_sid:
security_desc_str += f"(A;;FA;;;{_app_container_sid})"
security_desc = ctypes.wintypes.LPVOID(None)
ret = advapi32.ConvertStringSecurityDescriptorToSecurityDescriptorW(
security_desc_str,
SDDL_REVISION1,
ctypes.byref(security_desc),
None,
)
if ret == 0:
error_code = kernel32.GetLastError()
raise RuntimeError(
f"Failed to create security descriptor! Error code: 0x{error_code:X}, "
f"message: {_win_error_to_message(error_code)}"
)
security_attr = SECURITY_ATTRIBUTES()
security_attr.nLength = ctypes.sizeof(SECURITY_ATTRIBUTES)
security_attr.lpSecurityDescriptor = security_desc
security_attr.bInheritHandle = False
# Create directory
ret = kernel32.CreateDirectoryW(
dir_name,
security_attr,
)
if ret == 0:
# Call failed; store error code immediately, to avoid it being overwritten in cleanup below.
error_code = kernel32.GetLastError()
# Free security descriptor
kernel32.LocalFree(security_desc)
# Exit on succeess
if ret != 0:
return
# Construct OSError from win error code
error_message = _win_error_to_message(error_code)
# Strip trailing dot to match error message from os.mkdir().
if error_message and error_message[-1] == '.':
error_message = error_message[:-1]
raise OSError(
None, # errno
error_message, # strerror
dir_name, # filename
error_code, # winerror
None, # filename2
)

View File

@@ -0,0 +1,118 @@
# -----------------------------------------------------------------------------
# Copyright (c) 2024, PyInstaller Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#
# SPDX-License-Identifier: Apache-2.0
# -----------------------------------------------------------------------------
import os
import importlib
import atexit
# Helper for ensuring that only one Qt bindings package is registered at run-time via run-time hooks.
_registered_qt_bindings = None
def ensure_single_qt_bindings_package(qt_bindings):
global _registered_qt_bindings
if _registered_qt_bindings is not None:
raise RuntimeError(
f"Cannot execute run-time hook for {qt_bindings!r} because run-time hook for {_registered_qt_bindings!r} "
"has been run before, and PyInstaller-frozen applications do not support multiple Qt bindings in the same "
"application!"
)
_registered_qt_bindings = qt_bindings
# Helper for relocating Qt prefix via embedded qt.conf file.
_QT_CONF_FILENAME = ":/qt/etc/qt.conf"
_QT_CONF_RESOURCE_NAME = (
# qt
b"\x00\x02"
b"\x00\x00\x07\x84"
b"\x00\x71"
b"\x00\x74"
# etc
b"\x00\x03"
b"\x00\x00\x6c\xa3"
b"\x00\x65"
b"\x00\x74\x00\x63"
# qt.conf
b"\x00\x07"
b"\x08\x74\xa6\xa6"
b"\x00\x71"
b"\x00\x74\x00\x2e\x00\x63\x00\x6f\x00\x6e\x00\x66"
)
_QT_CONF_RESOURCE_STRUCT = (
# :
b"\x00\x00\x00\x00\x00\x02\x00\x00\x00\x01\x00\x00\x00\x01"
# :/qt
b"\x00\x00\x00\x00\x00\x02\x00\x00\x00\x01\x00\x00\x00\x02"
# :/qt/etc
b"\x00\x00\x00\x0a\x00\x02\x00\x00\x00\x01\x00\x00\x00\x03"
# :/qt/etc/qt.conf
b"\x00\x00\x00\x16\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00"
)
def create_embedded_qt_conf(qt_bindings, prefix_path):
# The QtCore module might be unavailable if we collected just the top-level binding package (e.g., PyQt5) without
# any of its submodules. Since this helper is called from run-time hook for the binding package, we need to handle
# that scenario here.
try:
QtCore = importlib.import_module(qt_bindings + ".QtCore")
except ImportError:
return
# No-op if embedded qt.conf already exists
if QtCore.QFile.exists(_QT_CONF_FILENAME):
return
# Create qt.conf file that relocates Qt prefix.
# NOTE: paths should use POSIX-style forward slashes as separator, even on Windows.
if os.sep == '\\':
prefix_path = prefix_path.replace(os.sep, '/')
qt_conf = f"[Paths]\nPrefix = {prefix_path}\n"
if os.name == 'nt' and qt_bindings in {"PySide2", "PySide6"}:
# PySide PyPI wheels on Windows set LibraryExecutablesPath to PrefixPath
qt_conf += f"LibraryExecutables = {prefix_path}"
# Encode the contents; in Qt5, QSettings uses Latin1 encoding, in Qt6, it uses UTF8.
if qt_bindings in {"PySide2", "PyQt5"}:
qt_conf = qt_conf.encode("latin1")
else:
qt_conf = qt_conf.encode("utf-8")
# Prepend data size (32-bit integer, big endian)
qt_conf_size = len(qt_conf)
qt_resource_data = qt_conf_size.to_bytes(4, 'big') + qt_conf
# Register
succeeded = QtCore.qRegisterResourceData(
0x01,
_QT_CONF_RESOURCE_STRUCT,
_QT_CONF_RESOURCE_NAME,
qt_resource_data,
)
if not succeeded:
return # Tough luck
# Unregister the resource at exit, to ensure that the registered resource on Qt/C++ side does not outlive the
# `_qt_resource_data` python variable and its data buffer. This also adds a reference to the `_qt_resource_data`,
# which conveniently ensures that the data is not garbage collected before we perform the cleanup (otherwise garbage
# collector might kick in at any time after we exit this helper function, and `qRegisterResourceData` does not seem
# to make a copy of the data!).
atexit.register(
QtCore.qUnregisterResourceData,
0x01,
_QT_CONF_RESOURCE_STRUCT,
_QT_CONF_RESOURCE_NAME,
qt_resource_data,
)

View File

@@ -0,0 +1,56 @@
# -----------------------------------------------------------------------------
# Copyright (c) 2023, PyInstaller Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#
# SPDX-License-Identifier: Apache-2.0
# -----------------------------------------------------------------------------
import os
import sys
import errno
import tempfile
# Helper for creating temporary directories with access restricted to the user running the process.
# On POSIX systems, this is already achieved by `tempfile.mkdtemp`, which uses 0o700 permissions mask.
# On Windows, however, the POSIX permissions semantics have no effect, and we need to provide our own implementation
# that restricts the access by passing appropriate security attributes to the `CreateDirectory` function.
if os.name == 'nt':
from . import _win32
def secure_mkdtemp(suffix=None, prefix=None, dir=None):
"""
Windows-specific replacement for `tempfile.mkdtemp` that restricts access to the user running the process.
Based on `mkdtemp` implementation from python 3.11 stdlib.
"""
prefix, suffix, dir, output_type = tempfile._sanitize_params(prefix, suffix, dir)
names = tempfile._get_candidate_names()
if output_type is bytes:
names = map(os.fsencode, names)
for seq in range(tempfile.TMP_MAX):
name = next(names)
file = os.path.join(dir, prefix + name + suffix)
sys.audit("tempfile.mkdtemp", file)
try:
_win32.secure_mkdir(file)
except FileExistsError:
continue # try again
except PermissionError:
# This exception is thrown when a directory with the chosen name already exists on windows.
if (os.name == 'nt' and os.path.isdir(dir) and os.access(dir, os.W_OK)):
continue
else:
raise
return file
raise FileExistsError(errno.EEXIST, "No usable temporary directory name found")
else:
secure_mkdtemp = tempfile.mkdtemp

View File

@@ -0,0 +1,211 @@
# -----------------------------------------------------------------------------
# Copyright (c) 2005-2023, PyInstaller Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# The full license is in the file COPYING.txt, distributed with this software.
#
# SPDX-License-Identifier: Apache-2.0
# -----------------------------------------------------------------------------
# This module is not a "fake module" in the classical sense, but a real module that can be imported. It acts as an RPC
# interface for the functions of the bootloader.
"""
This module connects to the bootloader to send messages to the splash screen.
It is intended to act as a RPC interface for the functions provided by the bootloader, such as displaying text or
closing. This makes the users python program independent of how the communication with the bootloader is implemented,
since a consistent API is provided.
To connect to the bootloader, it connects to a local tcp socket whose port is passed through the environment variable
'_PYI_SPLASH_IPC'. The bootloader creates a server socket and accepts every connection request. Since the os-module,
which is needed to request the environment variable, is not available at boot time, the module does not establish the
connection until initialization.
The protocol by which the Python interpreter communicates with the bootloader is implemented in this module.
This module does not support reloads while the splash screen is displayed, i.e. it cannot be reloaded (such as by
importlib.reload), because the splash screen closes automatically when the connection to this instance of the module
is lost.
"""
import atexit
import os
# Import the _socket module instead of the socket module. All used functions to connect to the ipc system are
# provided by the C module and the users program does not necessarily need to include the socket module and all
# required modules it uses.
import _socket
__all__ = ["CLOSE_CONNECTION", "FLUSH_CHARACTER", "is_alive", "close", "update_text"]
try:
# The user might have excluded logging from imports.
import logging as _logging
except ImportError:
_logging = None
try:
# The user might have excluded functools from imports.
from functools import update_wrapper
except ImportError:
update_wrapper = None
# Utility
def _log(level, msg, *args, **kwargs):
"""
Conditional wrapper around logging module. If the user excluded logging from the imports or it was not imported,
this function should handle it and avoid using the logger.
"""
if _logging:
logger = _logging.getLogger(__name__)
logger.log(level, msg, *args, **kwargs)
# These constants define single characters which are needed to send commands to the bootloader. Those constants are
# also set in the tcl script.
CLOSE_CONNECTION = b'\x04' # ASCII End-of-Transmission character
FLUSH_CHARACTER = b'\x0D' # ASCII Carriage Return character
# Module internal variables
_initialized = False
# Keep these variables always synchronized
_ipc_socket_closed = True
_ipc_socket = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
def _initialize():
"""
Initialize this module
:return:
"""
global _initialized, _ipc_socket_closed
# If _ipc_port is zero, the splash screen is intentionally suppressed (for example, we are in sub-process spawned
# via sys.executable). Mark the splash screen as initialized, but do not attempt to connect.
if _ipc_port == 0:
_initialized = True
return
# Attempt to connect to the splash screen process.
try:
_ipc_socket.connect(("127.0.0.1", _ipc_port))
_ipc_socket_closed = False
_initialized = True
_log(10, "IPC connection to the splash screen was successfully established.") # log-level: debug
except OSError as err:
raise ConnectionError(f"Could not connect to TCP port {_ipc_port}.") from err
# We expect a splash screen from the bootloader, but if _PYI_SPLASH_IPC is not set, the module cannot connect to it.
# _PYI_SPLASH_IPC being set to zero indicates that splash screen should be (gracefully) suppressed; i.e., the calls
# in this module should become no-op without generating warning messages.
try:
_ipc_port = int(os.environ['_PYI_SPLASH_IPC'])
del os.environ['_PYI_SPLASH_IPC']
# Initialize the connection upon importing this module. This will establish a connection to the bootloader's TCP
# server socket.
_initialize()
except (KeyError, ValueError):
# log-level: warning
_log(
30,
"The environment does not allow connecting to the splash screen. Did bootloader fail to initialize it?",
exc_info=True,
)
except ConnectionError:
# log-level: error
_log(40, "Failed to connect to the bootloader's IPC server!", exc_info=True)
def _check_connection(func):
"""
Utility decorator for checking whether the function should be executed.
The wrapped function may raise a ConnectionError if the module was not initialized correctly.
"""
def wrapper(*args, **kwargs):
"""
Executes the wrapped function if the environment allows it.
That is, if the connection to to bootloader has not been closed and the module is initialized.
:raises RuntimeError: if the module was not initialized correctly.
"""
if _initialized and _ipc_socket_closed:
if _ipc_port != 0:
_log(10, "Connection to splash screen has already been closed.") # log-level: debug
return
elif not _initialized:
raise RuntimeError("This module is not initialized; did it fail to load?")
return func(*args, **kwargs)
if update_wrapper:
# For runtime introspection
update_wrapper(wrapper, func)
return wrapper
@_check_connection
def _send_command(cmd, args=None):
"""
Send the command followed by args to the splash screen.
:param str cmd: The command to send. All command have to be defined as procedures in the tcl splash screen script.
:param list[str] args: All arguments to send to the receiving function
"""
if args is None:
args = []
full_cmd = "%s(%s)" % (cmd, " ".join(args))
try:
_ipc_socket.sendall(full_cmd.encode("utf-8") + FLUSH_CHARACTER)
except OSError as err:
raise ConnectionError(f"Unable to send command {full_cmd!r} to the bootloader") from err
def is_alive():
"""
Indicates whether the module can be used.
Returns False if the module is either not initialized or was disabled by closing the splash screen. Otherwise,
the module should be usable.
"""
return _initialized and not _ipc_socket_closed
@_check_connection
def update_text(msg: str):
"""
Updates the text on the splash screen window.
:param str msg: the text to be displayed
:raises ConnectionError: If the OS fails to write to the socket.
:raises RuntimeError: If the module is not initialized.
"""
_send_command("update_text", [msg])
def close():
"""
Close the connection to the ipc tcp server socket.
This will close the splash screen and renders this module unusable. After this function is called, no connection
can be opened to the splash screen again and all functions in this module become unusable.
"""
global _ipc_socket_closed
if _initialized and not _ipc_socket_closed:
_ipc_socket.sendall(CLOSE_CONNECTION)
_ipc_socket.close()
_ipc_socket_closed = True
@atexit.register
def _exit():
close()