Extensions: refactor CommandBatch.exec_non_blocking return value
[blender-addons-contrib.git] / bl_pkg / bl_extension_utils.py
blobef0dd6b9328332d1d3803de8d3f3668c1d95fb83
1 # SPDX-FileCopyrightText: 2023 Blender Foundation
3 # SPDX-License-Identifier: GPL-2.0-or-later
5 """
6 Non-blocking access to package management.
8 - No ``bpy`` module use.
9 """
11 __all__ = (
12 # Public Repository Actions.
13 "repo_sync",
14 "repo_upgrade",
15 "repo_listing",
17 # Public Package Actions.
18 "pkg_install_files",
19 "pkg_install",
20 "pkg_uninstall",
22 "pkg_make_obsolete_for_testing",
24 "dummy_progress",
26 # Public Stand-Alone Utilities.
27 "pkg_theme_file_list",
28 "file_mtime_or_none",
30 # Public API.
31 "json_from_filepath",
32 "toml_from_filepath",
33 "json_to_filepath",
35 "pkg_manifest_dict_is_valid_or_error",
36 "pkg_manifest_dict_from_file_or_error",
37 "pkg_manifest_archive_url_abs_from_repo_url",
39 "CommandBatch",
40 "RepoCacheStore",
42 # Directory Lock.
43 "RepoLock",
44 "RepoLockContext",
47 import json
48 import os
49 import sys
50 import signal
51 import stat
52 import subprocess
53 import time
54 import tomllib
57 from typing import (
58 Any,
59 Callable,
60 Generator,
61 IO,
62 List,
63 Optional,
64 Dict,
65 NamedTuple,
66 Sequence,
67 Set,
68 Tuple,
69 Union,
72 BASE_DIR = os.path.abspath(os.path.dirname(__file__))
74 BLENDER_EXT_CMD = (
75 # When run from within Blender, it will point to Blender's local Python binary.
76 sys.executable,
77 os.path.normpath(os.path.join(BASE_DIR, "cli", "blender_ext.py")),
80 # This directory is in the local repository.
81 REPO_LOCAL_PRIVATE_DIR = ".blender_ext"
82 # Locate inside `REPO_LOCAL_PRIVATE_DIR`.
83 REPO_LOCAL_PRIVATE_LOCK = "bl_ext_repo.lock"
85 PKG_REPO_LIST_FILENAME = "bl_ext_repo.json"
86 PKG_MANIFEST_FILENAME_TOML = "blender_manifest.toml"
87 PKG_EXT = ".zip"
89 # Add this to the local JSON file.
90 REPO_LOCAL_JSON = os.path.join(REPO_LOCAL_PRIVATE_DIR, PKG_REPO_LIST_FILENAME)
92 # An item we communicate back to Blender.
93 InfoItem = Tuple[str, Any]
94 InfoItemSeq = Sequence[InfoItem]
96 COMPLETE_ITEM = ('DONE', "")
98 # Time to wait when there is no output, avoid 0 as it causes high CPU usage.
99 IDLE_WAIT_ON_READ = 0.05
100 # IDLE_WAIT_ON_READ = 0.2
103 # -----------------------------------------------------------------------------
104 # Internal Functions.
107 if sys.platform == "win32":
108 # See: https://stackoverflow.com/a/35052424/432509
109 def file_handle_make_non_blocking(file_handle: IO[bytes]) -> None:
110 # Constant could define globally but avoid polluting the name-space
111 # thanks to: https://stackoverflow.com/questions/34504970
112 import msvcrt
113 from ctypes import (
114 POINTER,
115 WinError,
116 byref,
117 windll,
118 wintypes,
120 from ctypes.wintypes import (
121 BOOL,
122 DWORD,
123 HANDLE,
126 LPDWORD = POINTER(DWORD)
128 PIPE_NOWAIT = wintypes.DWORD(0x00000001)
130 # Set non-blocking.
131 SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
132 SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
133 SetNamedPipeHandleState.restype = BOOL
134 os_handle = msvcrt.get_osfhandle(file_handle.fileno())
135 res = windll.kernel32.SetNamedPipeHandleState(os_handle, byref(PIPE_NOWAIT), None, None)
136 if res == 0:
137 print(WinError())
139 def file_handle_non_blocking_is_error_blocking(ex: BaseException) -> bool:
140 if not isinstance(ex, OSError):
141 return False
142 from ctypes import GetLastError
143 ERROR_NO_DATA = 232
144 # This is sometimes zero, `ex.args == (22, "Invalid argument")`
145 # This could be checked but for now ignore all zero errors.
146 return (GetLastError() in {0, ERROR_NO_DATA})
148 else:
149 def file_handle_make_non_blocking(file_handle: IO[bytes]) -> None:
150 import fcntl
151 # Get current `file_handle` flags.
152 flags = fcntl.fcntl(file_handle.fileno(), fcntl.F_GETFL)
153 fcntl.fcntl(file_handle, fcntl.F_SETFL, flags | os.O_NONBLOCK)
155 def file_handle_non_blocking_is_error_blocking(ex: BaseException) -> bool:
156 if not isinstance(ex, BlockingIOError):
157 return False
158 return True
161 def file_mtime_or_none(filepath: str) -> Optional[int]:
162 try:
163 # For some reason `mypy` thinks this is a float.
164 return int(os.stat(filepath)[stat.ST_MTIME])
165 except FileNotFoundError:
166 return None
169 def scandir_with_demoted_errors(path: str) -> Generator[os.DirEntry[str], None, None]:
170 try:
171 for entry in os.scandir(path):
172 yield entry
173 except BaseException as ex:
174 print("Error: scandir", ex)
177 # -----------------------------------------------------------------------------
178 # Call JSON.
181 def non_blocking_call(cmd: Sequence[str]) -> subprocess.Popen[bytes]:
182 # pylint: disable-next=consider-using-with
183 ps = subprocess.Popen(cmd, stdout=subprocess.PIPE)
184 stdout = ps.stdout
185 assert stdout is not None
186 # Needed so whatever is available can be read (without waiting).
187 file_handle_make_non_blocking(stdout)
188 return ps
191 def command_output_from_json_0(
192 args: Sequence[str],
193 use_idle: bool,
194 ) -> Generator[InfoItemSeq, bool, None]:
195 cmd = [*BLENDER_EXT_CMD, *args, "--output-type=JSON_0"]
196 ps = non_blocking_call(cmd)
197 stdout = ps.stdout
198 assert stdout is not None
199 chunk_list = []
200 request_exit_signal_sent = False
202 while True:
203 # It's possible this is multiple chunks.
204 try:
205 chunk = stdout.read()
206 except BaseException as ex:
207 if not file_handle_non_blocking_is_error_blocking(ex):
208 raise ex
209 chunk = b''
211 json_messages = []
213 if not chunk:
214 if ps.poll() is not None:
215 break
216 if use_idle:
217 time.sleep(IDLE_WAIT_ON_READ)
218 elif (chunk_zero_index := chunk.find(b'\0')) == -1:
219 chunk_list.append(chunk)
220 else:
221 # Extract contiguous data from `chunk_list`.
222 chunk_list.append(chunk[:chunk_zero_index])
224 json_bytes_list = [b''.join(chunk_list)]
225 chunk_list.clear()
227 # There may be data afterwards, even whole chunks.
228 if chunk_zero_index + 1 != len(chunk):
229 chunk = chunk[chunk_zero_index + 1:]
230 # Add whole chunks.
231 while (chunk_zero_index := chunk.find(b'\0')) != -1:
232 json_bytes_list.append(chunk[:chunk_zero_index])
233 chunk = chunk[chunk_zero_index + 1:]
234 if chunk:
235 chunk_list.append(chunk)
237 request_exit = False
239 for json_bytes in json_bytes_list:
240 json_data = json.loads(json_bytes.decode("utf-8"))
242 assert len(json_data) == 2
243 assert isinstance(json_data[0], str)
245 json_messages.append((json_data[0], json_data[1]))
247 # Yield even when `json_messages`, otherwise this generator can block.
248 # It also means a request to exit might not be responded to soon enough.
249 request_exit = yield json_messages
250 if request_exit and not request_exit_signal_sent:
251 ps.send_signal(signal.SIGINT)
252 request_exit_signal_sent = True
255 # -----------------------------------------------------------------------------
256 # Internal Functions.
260 def repositories_validate_or_errors(repos: Sequence[str]) -> Optional[InfoItemSeq]:
261 return None
264 # -----------------------------------------------------------------------------
265 # Public Stand-Alone Utilities
268 def pkg_theme_file_list(directory: str, pkg_idname: str) -> Tuple[str, List[str]]:
269 theme_dir = os.path.join(directory, pkg_idname)
270 theme_files = [
271 filename for entry in os.scandir(theme_dir)
272 if ((not entry.is_dir()) and
273 (not (filename := entry.name).startswith(".")) and
274 filename.lower().endswith(".xml"))
276 theme_files.sort()
277 return theme_dir, theme_files
280 # -----------------------------------------------------------------------------
281 # Public Repository Actions
284 def repo_sync(
286 directory: str,
287 repo_url: str,
288 online_user_agent: str,
289 use_idle: bool,
290 force_exit_ok: bool = False,
291 ) -> Generator[InfoItemSeq, None, None]:
293 Implementation:
294 ``bpy.ops.ext.repo_sync(directory)``.
296 yield from command_output_from_json_0([
297 "sync",
298 "--local-dir", directory,
299 "--repo-dir", repo_url,
300 "--online-user-agent", online_user_agent,
301 *(("--force-exit-ok",) if force_exit_ok else ()),
302 ], use_idle=use_idle)
303 yield [COMPLETE_ITEM]
306 def repo_upgrade(
308 directory: str,
309 repo_url: str,
310 online_user_agent: str,
311 use_idle: bool,
312 ) -> Generator[InfoItemSeq, None, None]:
314 Implementation:
315 ``bpy.ops.ext.repo_upgrade(directory)``.
317 yield from command_output_from_json_0([
318 "upgrade",
319 "--local-dir", directory,
320 "--repo-dir", repo_url,
321 "--online-user-agent", online_user_agent,
322 ], use_idle=use_idle)
323 yield [COMPLETE_ITEM]
326 def repo_listing(
328 repos: Sequence[str],
329 ) -> Generator[InfoItemSeq, None, None]:
331 Implementation:
332 ``bpy.ops.ext.repo_listing(directory)``.
334 if result := repositories_validate_or_errors(repos):
335 yield result
336 return
338 yield [COMPLETE_ITEM]
341 # -----------------------------------------------------------------------------
342 # Public Package Actions
345 def pkg_install_files(
347 directory: str,
348 files: Sequence[str],
349 use_idle: bool,
350 ) -> Generator[InfoItemSeq, None, None]:
352 Implementation:
353 ``bpy.ops.ext.pkg_install_files(directory, files)``.
355 yield from command_output_from_json_0([
356 "install-files", *files,
357 "--local-dir", directory,
358 ], use_idle=use_idle)
359 yield [COMPLETE_ITEM]
362 def pkg_install(
364 directory: str,
365 repo_url: str,
366 pkg_id_sequence: Sequence[str],
367 online_user_agent: str,
368 use_cache: bool,
369 use_idle: bool,
370 ) -> Generator[InfoItemSeq, None, None]:
372 Implementation:
373 ``bpy.ops.ext.pkg_install(directory, pkg_id)``.
375 yield from command_output_from_json_0([
376 "install", ",".join(pkg_id_sequence),
377 "--local-dir", directory,
378 "--repo-dir", repo_url,
379 "--online-user-agent", online_user_agent,
380 "--local-cache", str(int(use_cache)),
381 ], use_idle=use_idle)
382 yield [COMPLETE_ITEM]
385 def pkg_uninstall(
387 directory: str,
388 pkg_id_sequence: Sequence[str],
389 use_idle: bool,
390 ) -> Generator[InfoItemSeq, None, None]:
392 Implementation:
393 ``bpy.ops.ext.pkg_uninstall(directory, pkg_id)``.
395 yield from command_output_from_json_0([
396 "uninstall", ",".join(pkg_id_sequence),
397 "--local-dir", directory,
398 ], use_idle=use_idle)
399 yield [COMPLETE_ITEM]
402 # -----------------------------------------------------------------------------
403 # Public Demo Actions
406 def dummy_progress(
408 use_idle: bool,
409 ) -> Generator[InfoItemSeq, bool, None]:
411 Implementation:
412 ``bpy.ops.ext.dummy_progress()``.
414 yield from command_output_from_json_0(["dummy-progress", "--time-duration=1.0"], use_idle=use_idle)
415 yield [COMPLETE_ITEM]
418 # -----------------------------------------------------------------------------
419 # Public (non-command-line-wrapping) functions
422 def json_from_filepath(filepath_json: str) -> Optional[Dict[str, Any]]:
423 if os.path.exists(filepath_json):
424 with open(filepath_json, "r", encoding="utf-8") as fh:
425 result = json.loads(fh.read())
426 assert isinstance(result, dict)
427 return result
428 return None
431 def toml_from_filepath(filepath_json: str) -> Optional[Dict[str, Any]]:
432 if os.path.exists(filepath_json):
433 with open(filepath_json, "r", encoding="utf-8") as fh:
434 return tomllib.loads(fh.read())
435 return None
438 def json_to_filepath(filepath_json: str, data: Any) -> None:
439 with open(filepath_json, "w", encoding="utf-8") as fh:
440 fh.write(json.dumps(data))
443 def pkg_make_obsolete_for_testing(local_dir: str, pkg_id: str) -> None:
444 import re
445 filepath = os.path.join(local_dir, pkg_id, PKG_MANIFEST_FILENAME_TOML)
446 # Weak! use basic matching to replace the version, not nice but OK as a debugging option.
447 with open(filepath, "r", encoding="utf-8") as fh:
448 data = fh.read()
450 def key_replace(match: re.Match[str]) -> str:
451 return "version = \"0.0.0\""
453 data = re.sub(r"^\s*version\s*=\s*\"[^\"]+\"", key_replace, data, flags=re.MULTILINE)
454 with open(filepath, "w", encoding="utf-8") as fh:
455 fh.write(data)
458 def pkg_manifest_dict_is_valid_or_error(
459 data: Dict[str, Any],
460 from_repo: bool,
461 ) -> Optional[str]:
462 # Exception! In in general `cli` shouldn't be considered a Python module,
463 # it's validation function is handy to reuse.
464 from .cli.blender_ext import pkg_manifest_from_dict_and_validate
465 assert "id" in data
466 result = pkg_manifest_from_dict_and_validate(data, from_repo=from_repo)
467 if isinstance(result, str):
468 return result
469 return None
472 def pkg_manifest_dict_from_file_or_error(
473 filepath: str,
474 ) -> Union[Dict[str, Any], str]:
475 from .cli.blender_ext import pkg_manifest_from_archive_and_validate
476 result = pkg_manifest_from_archive_and_validate(filepath)
477 if isinstance(result, str):
478 return result
479 # Else convert the named-tuple into a dictionary.
480 result_dict = result._asdict()
481 assert isinstance(result_dict, dict)
482 return result_dict
485 def pkg_manifest_archive_url_abs_from_repo_url(repo_url: str, archive_url: str) -> str:
486 if archive_url.startswith("./"):
487 if (
488 len(repo_url) > len(PKG_REPO_LIST_FILENAME) and
489 repo_url.endswith(PKG_REPO_LIST_FILENAME) and
490 (repo_url[-(len(PKG_REPO_LIST_FILENAME) + 1)] in {"\\", "/"})
492 # The URL contains the JSON name, strip this off before adding the package name.
493 archive_url = repo_url[:-len(PKG_REPO_LIST_FILENAME)] + archive_url[2:]
494 elif repo_url.startswith(("http://", "https://", "file://")):
495 # Simply add to the URL.
496 archive_url = repo_url.rstrip("/") + archive_url[1:]
497 else:
498 # Handle as a regular path.
499 archive_url = os.path.join(repo_url, archive_url[2:])
500 return archive_url
503 def pkg_repo_cache_clear(local_dir: str) -> None:
504 local_cache_dir = os.path.join(local_dir, ".blender_ext", "cache")
505 if not os.path.isdir(local_cache_dir):
506 return
508 for entry in scandir_with_demoted_errors(local_cache_dir):
509 if entry.is_dir(follow_symlinks=False):
510 continue
511 if not entry.name.endswith(PKG_EXT):
512 continue
514 # Should never fail unless the file-system has permissions issues or corruption.
515 try:
516 os.unlink(entry.path)
517 except BaseException as ex:
518 print("Error: unlink", ex)
521 # -----------------------------------------------------------------------------
522 # Public Command Pool (non-command-line wrapper)
525 InfoItemCallable = Callable[[], Generator[InfoItemSeq, bool, None]]
528 class CommandBatchItem:
529 __slots__ = (
530 "fn_with_args",
531 "fn_iter",
532 "status",
533 "has_error",
534 "has_warning",
535 "msg_log",
536 "msg_log_len_last",
538 "msg_type",
539 "msg_info",
542 STATUS_NOT_YET_STARTED = 0
543 STATUS_RUNNING = 1
544 STATUS_COMPLETE = 2
546 def __init__(self, fn_with_args: InfoItemCallable):
547 self.fn_with_args = fn_with_args
548 self.fn_iter: Optional[Generator[InfoItemSeq, bool, None]] = None
549 self.status = CommandBatchItem.STATUS_NOT_YET_STARTED
550 self.has_error = False
551 self.has_warning = False
552 self.msg_log: List[Tuple[str, Any]] = []
553 self.msg_log_len_last = 0
554 self.msg_type = ""
555 self.msg_info = ""
557 def invoke(self) -> Generator[InfoItemSeq, bool, None]:
558 return self.fn_with_args()
561 class CommandBatch_ExecNonBlockingResult(NamedTuple):
562 # A message list for each command, aligned to `CommandBatchItem._batch`.
563 messages: Tuple[List[Tuple[str, str]], ...]
564 # When true, the status of all commands is `CommandBatchItem.STATUS_COMPLETE`.
565 all_complete: bool
566 # When true, `calc_status_data` will return a different result.
567 status_data_changed: bool
570 class CommandBatch_StatusFlag(NamedTuple):
571 flag: int
572 failure_count: int
573 count: int
576 class CommandBatch:
577 __slots__ = (
578 "title",
580 "_batch",
581 "_request_exit",
582 "_log_added_since_accessed",
585 def __init__(
586 self,
588 title: str,
589 batch: Sequence[InfoItemCallable],
591 self.title = title
592 self._batch = [CommandBatchItem(fn_with_args) for fn_with_args in batch]
593 self._request_exit = False
594 self._log_added_since_accessed = True
596 def _exec_blocking_single(
597 self,
598 report_fn: Callable[[str, str], None],
599 request_exit_fn: Callable[[], bool],
600 ) -> bool:
601 for cmd in self._batch:
602 assert cmd.fn_iter is None
603 cmd.fn_iter = cmd.invoke()
604 request_exit: Optional[bool] = None
605 while True:
606 try:
607 # Request `request_exit` starts of as None, then it's a boolean.
608 json_messages = cmd.fn_iter.send(request_exit) # type: ignore
609 except StopIteration:
610 break
612 for ty, msg in json_messages:
613 report_fn(ty, msg)
615 if request_exit is None:
616 request_exit = False
618 if request_exit is True:
619 break
620 if request_exit is None:
621 return True
622 return request_exit
624 def _exec_blocking_multi(
625 self,
627 report_fn: Callable[[str, str], None],
628 request_exit_fn: Callable[[], bool],
629 ) -> bool:
630 # TODO, concurrent execution.
631 return self._exec_blocking_single(report_fn, request_exit_fn)
633 def exec_blocking(
634 self,
635 report_fn: Callable[[str, str], None],
636 request_exit_fn: Callable[[], bool],
637 concurrent: bool,
638 ) -> bool:
639 # Blocking execution & finish.
640 if concurrent:
641 return self._exec_blocking_multi(
642 report_fn=report_fn,
643 request_exit_fn=request_exit_fn,
645 return self._exec_blocking_single(report_fn, request_exit_fn)
647 def exec_non_blocking(
648 self,
650 request_exit: bool,
651 ) -> CommandBatch_ExecNonBlockingResult:
653 Return the result of running multiple commands.
655 command_output: Tuple[List[Tuple[str, str]], ...] = tuple([] for _ in range(len(self._batch)))
657 if request_exit:
658 self._request_exit = True
660 status_data_changed = False
662 complete_count = 0
663 for cmd_index in reversed(range(len(self._batch))):
664 cmd = self._batch[cmd_index]
665 if cmd.status == CommandBatchItem.STATUS_COMPLETE:
666 complete_count += 1
667 continue
669 send_arg: Optional[bool] = self._request_exit
671 # First time initialization.
672 if cmd.fn_iter is None:
673 cmd.fn_iter = cmd.invoke()
674 cmd.status = CommandBatchItem.STATUS_RUNNING
675 status_data_changed = True
676 send_arg = None
678 try:
679 json_messages = cmd.fn_iter.send(send_arg) # type: ignore
680 except StopIteration:
681 # FIXME: This should not happen, we should get a "DONE" instead.
682 cmd.status = CommandBatchItem.STATUS_COMPLETE
683 complete_count += 1
684 status_data_changed = True
685 continue
687 if json_messages:
688 for ty, msg in json_messages:
689 self._log_added_since_accessed = True
691 cmd.msg_type = ty
692 cmd.msg_info = msg
693 if ty == 'DONE':
694 assert msg == ""
695 cmd.status = CommandBatchItem.STATUS_COMPLETE
696 complete_count += 1
697 status_data_changed = True
698 break
700 command_output[cmd_index].append((ty, msg))
701 if ty != 'PROGRESS':
702 if ty == 'ERROR':
703 if not cmd.has_error:
704 cmd.has_error = True
705 status_data_changed = True
706 elif ty == 'WARNING':
707 if not cmd.has_warning:
708 cmd.has_warning = True
709 status_data_changed = True
710 cmd.msg_log.append((ty, msg))
712 # Check if all are complete.
713 assert complete_count == len([cmd for cmd in self._batch if cmd.status == CommandBatchItem.STATUS_COMPLETE])
714 all_complete = (complete_count == len(self._batch))
715 return CommandBatch_ExecNonBlockingResult(
716 messages=command_output,
717 all_complete=all_complete,
718 status_data_changed=status_data_changed,
721 def calc_status_string(self) -> List[str]:
722 return [
723 "{:s}: {:s}".format(cmd.msg_type, cmd.msg_info)
724 for cmd in self._batch if (cmd.msg_type or cmd.msg_info)
727 def calc_status_data(self) -> CommandBatch_StatusFlag:
729 A single string for all commands
731 status_flag = 0
732 failure_count = 0
733 for cmd in self._batch:
734 status_flag |= 1 << cmd.status
735 if cmd.has_error or cmd.has_warning:
736 failure_count += 1
737 return CommandBatch_StatusFlag(
738 flag=status_flag,
739 failure_count=failure_count,
740 count=len(self._batch),
743 @staticmethod
744 def calc_status_text_icon_from_data(status_data: CommandBatch_StatusFlag, update_count: int) -> Tuple[str, str]:
745 # Generate a nice UI string for a status-bar & splash screen (must be short).
747 # NOTE: this is (arguably) UI logic, it's just nice to have it here
748 # as it avoids using low-level flags externally.
750 # FIXME: this text assumed a "sync" operation.
751 if status_data.failure_count == 0:
752 fail_text = ""
753 elif status_data.failure_count == status_data.count:
754 fail_text = ", failed"
755 else:
756 fail_text = ", some actions failed"
758 if status_data.flag == 1 << CommandBatchItem.STATUS_NOT_YET_STARTED:
759 return "Starting Extension Updates{:s}".format(fail_text), 'SORTTIME'
760 if status_data.flag == 1 << CommandBatchItem.STATUS_COMPLETE:
761 if update_count > 0:
762 # NOTE: the UI design in #120612 has the number of extensions available in icon.
763 # Include in the text as this is not yet supported.
764 return "Extensions Updates Available ({:d}){:s}".format(update_count, fail_text), 'URL'
765 return "All Extensions Up-to-date{:s}".format(fail_text), 'CHECKMARK'
766 if status_data.flag & 1 << CommandBatchItem.STATUS_RUNNING:
767 return "Checking for Extension Updates{:s}".format(fail_text), 'SORTTIME'
769 # Should never reach this line!
770 return "Internal error, unknown state!{:s}".format(fail_text), 'ERROR'
772 def calc_status_log_or_none(self) -> Optional[List[Tuple[str, str]]]:
774 Return the log or None if there were no changes since the last call.
776 if self._log_added_since_accessed is False:
777 return None
778 self._log_added_since_accessed = False
780 return [
781 (ty, msg)
782 for cmd in self._batch
783 for ty, msg in (cmd.msg_log + ([(cmd.msg_type, cmd.msg_info)] if cmd.msg_type == 'PROGRESS' else []))
786 def calc_status_log_since_last_request_or_none(self) -> Optional[List[List[Tuple[str, str]]]]:
788 Return a list of new errors per command or None when none are found.
790 result: List[List[Tuple[str, str]]] = [[] for _ in range(len(self._batch))]
791 found = False
792 for cmd_index, cmd in enumerate(self._batch):
793 msg_log_len = len(cmd.msg_log)
794 if cmd.msg_log_len_last == msg_log_len:
795 continue
796 assert cmd.msg_log_len_last < msg_log_len
797 result[cmd_index] = cmd.msg_log[cmd.msg_log_len_last:]
798 cmd.msg_log_len_last = len(cmd.msg_log)
799 found = True
801 return result if found else None
804 # -----------------------------------------------------------------------------
805 # Public Repo Cache (non-command-line wrapper)
808 class _RepoCacheEntry:
809 __slots__ = (
810 "directory",
811 "repo_url",
813 "_pkg_manifest_local",
814 "_pkg_manifest_remote",
815 "_pkg_manifest_remote_mtime",
816 "_pkg_manifest_remote_has_warning"
819 def __init__(self, directory: str, repo_url: str) -> None:
820 assert directory != ""
821 self.directory = directory
822 self.repo_url = repo_url
823 # Manifest data per package loaded from the packages local JSON.
824 self._pkg_manifest_local: Optional[Dict[str, Dict[str, Any]]] = None
825 self._pkg_manifest_remote: Optional[Dict[str, Dict[str, Any]]] = None
826 self._pkg_manifest_remote_mtime = 0
827 # Avoid many noisy prints.
828 self._pkg_manifest_remote_has_warning = False
830 def _json_data_ensure(
831 self,
833 error_fn: Callable[[BaseException], None],
834 check_files: bool = False,
835 ignore_missing: bool = False,
836 ) -> Any:
837 if self._pkg_manifest_remote is not None:
838 if check_files:
839 self._json_data_refresh(error_fn=error_fn)
840 return self._pkg_manifest_remote
842 filepath_json = os.path.join(self.directory, REPO_LOCAL_JSON)
844 try:
845 self._pkg_manifest_remote = json_from_filepath(filepath_json)
846 except BaseException as ex:
847 self._pkg_manifest_remote = None
848 error_fn(ex)
850 self._pkg_manifest_local = None
851 if self._pkg_manifest_remote is not None:
852 json_mtime = file_mtime_or_none(filepath_json)
853 assert json_mtime is not None
854 self._pkg_manifest_remote_mtime = json_mtime
855 self._pkg_manifest_local = None
856 self._pkg_manifest_remote_has_warning = False
857 else:
858 if not ignore_missing:
859 # NOTE: this warning will occur when setting up a new repository.
860 # It could be removed but it's also useful to know when the JSON is missing.
861 if self.repo_url:
862 if not self._pkg_manifest_remote_has_warning:
863 print("Repository file:", filepath_json, "not found, sync required!")
864 self._pkg_manifest_remote_has_warning = True
866 return self._pkg_manifest_remote
868 def _json_data_refresh_from_toml(
869 self,
871 error_fn: Callable[[BaseException], None],
872 force: bool = False,
873 ) -> None:
874 assert self.repo_url == ""
875 # Since there is no remote repo the ID name is defined by the directory name only.
876 local_json_data = self.pkg_manifest_from_local_ensure(error_fn=error_fn)
877 if local_json_data is None:
878 return
880 filepath_json = os.path.join(self.directory, REPO_LOCAL_JSON)
882 # We might want to adjust where this happens, create the directory here
883 # because this could be a fresh repo might not have been initialized until now.
884 directory = os.path.dirname(filepath_json)
885 try:
886 # A symbolic-link that's followed (good), if it exists and is a file an error is raised here and returned.
887 if not os.path.isdir(directory):
888 os.makedirs(directory, exist_ok=True)
889 except BaseException as ex:
890 error_fn(ex)
891 return
892 del directory
894 with open(filepath_json, "w", encoding="utf-8") as fh:
895 # Indent because it can be useful to check this file if there are any issues.
897 # Begin: transform to list with ID's in item.
898 # TODO: this transform can probably be removed and the internal format can change
899 # to use the same structure as the actual JSON.
900 local_json_data_compat = {
901 "version": "v1",
902 "blocklist": [],
903 "data": [
904 {"id": pkg_idname, **value}
905 for pkg_idname, value in local_json_data.items()
908 # End: compatibility change.
910 fh.write(json.dumps(local_json_data_compat, indent=2))
912 def _json_data_refresh(
913 self,
915 error_fn: Callable[[BaseException], None],
916 force: bool = False,
917 ) -> None:
918 if force or (self._pkg_manifest_remote is None) or (self._pkg_manifest_remote_mtime == 0):
919 self._pkg_manifest_remote = None
920 self._pkg_manifest_remote_mtime = 0
921 self._pkg_manifest_local = None
923 # Detect a local-only repository, there is no server to sync with
924 # so generate the JSON from the TOML files.
925 # While redundant this avoids having support multiple code-paths for local-only/remote repos.
926 if self.repo_url == "":
927 self._json_data_refresh_from_toml(error_fn=error_fn, force=force)
929 filepath_json = os.path.join(self.directory, REPO_LOCAL_JSON)
930 mtime_test = file_mtime_or_none(filepath_json)
931 if self._pkg_manifest_remote is not None:
932 # TODO: check the time of every installed package.
933 if mtime_test == self._pkg_manifest_remote_mtime:
934 return
936 try:
937 self._pkg_manifest_remote = json_from_filepath(filepath_json)
938 except BaseException as ex:
939 self._pkg_manifest_remote = None
940 error_fn(ex)
942 self._pkg_manifest_local = None
943 if self._pkg_manifest_remote is not None:
944 json_mtime = file_mtime_or_none(filepath_json)
945 assert json_mtime is not None
946 self._pkg_manifest_remote_mtime = json_mtime
948 def pkg_manifest_from_local_ensure(
949 self,
951 error_fn: Callable[[BaseException], None],
952 ignore_missing: bool = False,
953 ) -> Optional[Dict[str, Dict[str, Any]]]:
954 # Important for local-only repositories (where the directory name defines the ID).
955 has_remote = self.repo_url != ""
957 if self._pkg_manifest_local is None:
958 self._json_data_ensure(
959 ignore_missing=ignore_missing,
960 error_fn=error_fn,
962 pkg_manifest_local = {}
963 try:
964 dir_entries = os.scandir(self.directory)
965 except BaseException as ex:
966 dir_entries = None
967 error_fn(ex)
969 for entry in (dir_entries if dir_entries is not None else ()):
970 # Only check directories.
971 if not entry.is_dir(follow_symlinks=True):
972 continue
974 filename = entry.name
976 # Simply ignore these paths without any warnings (accounts for `.git`, `__pycache__`, etc).
977 if filename.startswith((".", "_")):
978 continue
980 # Report any paths that cannot be used.
981 if not filename.isidentifier():
982 error_fn(Exception("\"{:s}\" is not a supported module name, skipping".format(
983 os.path.join(self.directory, filename)
985 continue
987 filepath_toml = os.path.join(self.directory, filename, PKG_MANIFEST_FILENAME_TOML)
988 try:
989 item_local = toml_from_filepath(filepath_toml)
990 except BaseException as ex:
991 item_local = None
992 error_fn(ex)
994 if item_local is None:
995 continue
997 pkg_idname = item_local["id"]
998 if has_remote:
999 # This should never happen, the user may have manually renamed a directory.
1000 if pkg_idname != filename:
1001 print("Skipping package with inconsistent name: \"{:s}\" mismatch \"{:s}\"".format(
1002 filename,
1003 pkg_idname,
1005 continue
1006 else:
1007 pkg_idname = filename
1009 # Validate so local-only packages with invalid manifests aren't used.
1010 if (error_str := pkg_manifest_dict_is_valid_or_error(item_local, from_repo=False)):
1011 error_fn(Exception(error_str))
1012 continue
1014 pkg_manifest_local[pkg_idname] = item_local
1015 self._pkg_manifest_local = pkg_manifest_local
1016 return self._pkg_manifest_local
1018 def pkg_manifest_from_remote_ensure(
1019 self,
1021 error_fn: Callable[[BaseException], None],
1022 ignore_missing: bool = False,
1023 ) -> Optional[Dict[str, Dict[str, Any]]]:
1024 if self._pkg_manifest_remote is None:
1025 self._json_data_ensure(
1026 ignore_missing=ignore_missing,
1027 error_fn=error_fn,
1029 return self._pkg_manifest_remote
1031 def force_local_refresh(self) -> None:
1032 self._pkg_manifest_local = None
1035 class RepoCacheStore:
1036 __slots__ = (
1037 "_repos",
1038 "_is_init",
1041 def __init__(self) -> None:
1042 self._repos: List[_RepoCacheEntry] = []
1043 self._is_init = False
1045 def is_init(self) -> bool:
1046 return self._is_init
1048 def refresh_from_repos(
1049 self, *,
1050 repos: List[Tuple[str, str]],
1051 force: bool = False,
1052 ) -> None:
1054 Initialize or update repositories.
1056 repos_prev = {}
1057 if not force:
1058 for repo_entry in self._repos:
1059 repos_prev[repo_entry.directory, repo_entry.repo_url] = repo_entry
1060 self._repos.clear()
1062 for directory, repo_url in repos:
1063 repo_entry_test = repos_prev.get((directory, repo_url))
1064 if repo_entry_test is None:
1065 repo_entry_test = _RepoCacheEntry(directory, repo_url)
1066 self._repos.append(repo_entry_test)
1067 self._is_init = True
1069 def refresh_remote_from_directory(
1070 self,
1071 directory: str,
1073 error_fn: Callable[[BaseException], None],
1074 force: bool = False,
1075 ) -> None:
1076 for repo_entry in self._repos:
1077 if directory == repo_entry.directory:
1078 repo_entry._json_data_refresh(force=force, error_fn=error_fn)
1079 return
1080 raise ValueError("Directory {:s} not a known repo".format(directory))
1082 def refresh_local_from_directory(
1083 self,
1084 directory: str,
1086 error_fn: Callable[[BaseException], None],
1087 ignore_missing: bool = False,
1088 directory_subset: Optional[Set[str]] = None,
1089 ) -> Optional[Dict[str, Dict[str, Any]]]:
1090 for repo_entry in self._repos:
1091 if directory == repo_entry.directory:
1092 # Force refresh.
1093 repo_entry.force_local_refresh()
1094 return repo_entry.pkg_manifest_from_local_ensure(
1095 ignore_missing=ignore_missing,
1096 error_fn=error_fn,
1098 raise ValueError("Directory {:s} not a known repo".format(directory))
1100 def pkg_manifest_from_remote_ensure(
1101 self,
1103 error_fn: Callable[[BaseException], None],
1104 check_files: bool = False,
1105 ignore_missing: bool = False,
1106 directory_subset: Optional[Set[str]] = None,
1107 ) -> Generator[Optional[Dict[str, Dict[str, Any]]], None, None]:
1108 for repo_entry in self._repos:
1109 if directory_subset is not None:
1110 if repo_entry.directory not in directory_subset:
1111 continue
1113 json_data = repo_entry._json_data_ensure(
1114 check_files=check_files,
1115 ignore_missing=ignore_missing,
1116 error_fn=error_fn,
1118 if json_data is None:
1119 # The repository may be fresh, not yet initialized.
1120 yield None
1121 else:
1122 pkg_manifest_remote = {}
1123 # "data" should always exist, it's not the purpose of this function to fully validate though.
1124 json_items = json_data.get("data")
1125 if json_items is None:
1126 error_fn(ValueError("JSON was missing \"data\" key"))
1127 yield None
1128 else:
1129 for item_remote in json_items:
1130 # TODO(@ideasman42): we may want to include the "id", as part of moving to a new format
1131 # the "id" used not to be part of each item so users of this API assume it's not.
1132 # The `item_remote` could be used in-place however that needs further testing.
1133 item_remove_copy = item_remote.copy()
1134 pkg_idname = item_remove_copy.pop("id")
1135 pkg_manifest_remote[pkg_idname] = item_remove_copy
1136 yield pkg_manifest_remote
1138 def pkg_manifest_from_local_ensure(
1139 self,
1141 error_fn: Callable[[BaseException], None],
1142 check_files: bool = False,
1143 directory_subset: Optional[Set[str]] = None,
1144 ) -> Generator[Optional[Dict[str, Dict[str, Any]]], None, None]:
1145 for repo_entry in self._repos:
1146 if directory_subset is not None:
1147 if repo_entry.directory not in directory_subset:
1148 continue
1149 if check_files:
1150 repo_entry.force_local_refresh()
1151 yield repo_entry.pkg_manifest_from_local_ensure(error_fn=error_fn)
1153 def clear(self) -> None:
1154 self._repos.clear()
1155 self._is_init = False
1158 # -----------------------------------------------------------------------------
1159 # Public Repo Lock
1163 class RepoLock:
1165 Lock multiple repositories, one or all may fail,
1166 it's up to the caller to check.
1168 Access via the ``RepoLockContext`` where possible to avoid the lock being left held.
1170 __slots__ = (
1171 "_repo_directories",
1172 "_repo_lock_files",
1173 "_cookie",
1174 "_held",
1177 def __init__(self, *, repo_directories: Sequence[str], cookie: str):
1179 :arg repo_directories:
1180 Directories to attempt to lock.
1181 :arg cookie:
1182 A path which is used as a reference.
1183 It must point to a path that exists.
1184 When a lock exists, check if the cookie path exists, if it doesn't, allow acquiring the lock.
1186 self._repo_directories = tuple(repo_directories)
1187 self._repo_lock_files: List[Tuple[str, str]] = []
1188 self._held = False
1189 self._cookie = cookie
1191 def __del__(self) -> None:
1192 if not self._held:
1193 return
1194 sys.stderr.write("{:s}: freed without releasing lock!".format(type(self).__name__))
1196 @staticmethod
1197 def _is_locked_with_stale_cookie_removal(local_lock_file: str, cookie: str) -> Optional[str]:
1198 if os.path.exists(local_lock_file):
1199 try:
1200 with open(local_lock_file, "r", encoding="utf8") as fh:
1201 data = fh.read()
1202 except BaseException as ex:
1203 return "lock file could not be read: {:s}".format(str(ex))
1205 # The lock is held.
1206 if os.path.exists(data):
1207 if data == cookie:
1208 return "lock is already held by this session"
1209 return "lock is held by other session: {:s}".format(data)
1211 # The lock is held (but stale), remove it.
1212 try:
1213 os.remove(local_lock_file)
1214 except BaseException as ex:
1215 return "lock file could not be removed: {:s}".format(str(ex))
1216 return None
1218 def acquire(self) -> Dict[str, Optional[str]]:
1220 Return directories and the lock status,
1221 with None if locking succeeded.
1223 if self._held:
1224 raise Exception("acquire(): called with an existing lock!")
1225 if not os.path.exists(self._cookie):
1226 raise Exception("acquire(): cookie doesn't exist! (when it should)")
1228 # Assume all succeed.
1229 result: Dict[str, Optional[str]] = {directory: None for directory in self._repo_directories}
1230 for directory in self._repo_directories:
1231 local_private_dir = os.path.join(directory, REPO_LOCAL_PRIVATE_DIR)
1233 # This most likely exists, create if it doesn't.
1234 if not os.path.isdir(local_private_dir):
1235 os.makedirs(local_private_dir)
1237 local_lock_file = os.path.join(local_private_dir, REPO_LOCAL_PRIVATE_LOCK)
1238 # Attempt to get the lock, kick out stale locks.
1239 if (lock_msg := self._is_locked_with_stale_cookie_removal(local_lock_file, self._cookie)) is not None:
1240 result[directory] = "Lock exists: {:s}".format(lock_msg)
1241 continue
1242 try:
1243 with open(local_lock_file, "w", encoding="utf8") as fh:
1244 fh.write(self._cookie)
1245 except BaseException as ex:
1246 result[directory] = "Lock could not be created: {:s}".format(str(ex))
1247 # Remove if it was created (but failed to write)... disk-full?
1248 try:
1249 os.remove(local_lock_file)
1250 except BaseException:
1251 pass
1252 continue
1254 # Success, the file is locked.
1255 self._repo_lock_files.append((directory, local_lock_file))
1256 self._held = True
1257 return result
1259 def release(self) -> Dict[str, Optional[str]]:
1260 # NOTE: lots of error checks here, mostly to give insights in the very unlikely case this fails.
1261 if not self._held:
1262 raise Exception("release(): called without a lock!")
1264 result: Dict[str, Optional[str]] = {directory: None for directory in self._repo_directories}
1265 for directory, local_lock_file in self._repo_lock_files:
1266 if not os.path.exists(local_lock_file):
1267 result[directory] = "release(): lock missing when expected, continuing."
1268 continue
1269 try:
1270 with open(local_lock_file, "r", encoding="utf8") as fh:
1271 data = fh.read()
1272 except BaseException as ex:
1273 result[directory] = "release(): lock file could not be read: {:s}".format(str(ex))
1274 continue
1275 # Owned by another application, this shouldn't happen.
1276 if data != self._cookie:
1277 result[directory] = "release(): lock was unexpectedly stolen by another program: {:s}".format(data)
1278 continue
1280 # This is our lock file, we're allowed to remove it!
1281 try:
1282 os.remove(local_lock_file)
1283 except BaseException as ex:
1284 result[directory] = "release(): failed to remove file {!r}".format(ex)
1286 self._held = False
1287 return result
1290 class RepoLockContext:
1291 __slots__ = (
1292 "_repo_lock",
1295 def __init__(self, *, repo_directories: Sequence[str], cookie: str):
1296 self._repo_lock = RepoLock(repo_directories=repo_directories, cookie=cookie)
1298 def __enter__(self) -> Dict[str, Optional[str]]:
1299 return self._repo_lock.acquire()
1301 def __exit__(self, _ty: Any, _value: Any, _traceback: Any) -> None:
1302 self._repo_lock.release()