# ##### BEGIN GPL LICENSE BLOCK ##### # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # # ##### END GPL LICENSE BLOCK ##### """ See documentation for usage https://github.com/CGCookie/blender-addon-updater """ __version__ = "1.1.1" import errno import traceback import platform import ssl import urllib.request import urllib import os import json import zipfile import shutil import threading import fnmatch from datetime import datetime, timedelta # Blender imports, used in limited cases. import bpy import addon_utils # ----------------------------------------------------------------------------- # The main class # ----------------------------------------------------------------------------- class SingletonUpdater: """Addon updater service class. This is the singleton class to instance once and then reference where needed throughout the addon. It implements all the interfaces for running updates. """ def __init__(self): self._engine = ForgejoEngine() self._user = None self._repo = None self._website = None self._current_version = None self._subfolder_path = None self._tags = list() self._tag_latest = None self._tag_names = list() self._latest_release = None self._use_releases = False self._include_branches = False self._include_branch_list = ['master'] self._include_branch_auto_check = False self._manual_only = False self._version_min_update = None self._version_max_update = None # By default, backup current addon on update/target install. self._backup_current = True self._backup_ignore_patterns = None # Set patterns the files to overwrite during an update. self._overwrite_patterns = ["*.py", "*.pyc"] self._remove_pre_update_patterns = list() # By default, don't auto disable+re-enable the addon after an update, # as this is less stable/often won't fully reload all modules anyways. self._auto_reload_post_update = False # Settings for the frequency of automated background checks. self._check_interval_enabled = False self._check_interval_months = 0 self._check_interval_days = 7 self._check_interval_hours = 0 self._check_interval_minutes = 0 # runtime variables, initial conditions self._verbose = False self._use_print_traces = True self._fake_install = False self._async_checking = False # only true when async daemon started self._update_ready = None self._update_link = None self._update_version = None self._source_zip = None self._check_thread = None self._select_link = None self.skip_tag = None # Get data from the running blender module (addon). self._addon = __package__.lower() self._addon_package = __package__ # Must not change. self._updater_path = os.path.join( os.path.dirname(__file__), self._addon + "_updater") self._addon_root = os.path.dirname(__file__) self._json = dict() self._error = None self._error_msg = None self._prefiltered_tag_count = 0 # UI properties, not used within this module but still useful to have. # to verify a valid import, in place of placeholder import self.show_popups = True # UI uses to show popups or not. self.invalid_updater = False # pre-assign basic select-link function def select_link_function(self, tag): return tag["zipball_url"] self._select_link = select_link_function def print_trace(self): """Print handled exception details when use_print_traces is set""" if self._use_print_traces: traceback.print_exc() def print_verbose(self, msg): """Print out a verbose logging message if verbose is true.""" if not self._verbose: return print("{} addon: ".format(self.addon) + msg) # ------------------------------------------------------------------------- # Getters and setters # ------------------------------------------------------------------------- @property def addon(self): return self._addon @addon.setter def addon(self, value): self._addon = str(value) @property def api_url(self): return self._engine.api_url @api_url.setter def api_url(self, value): if not self.check_is_url(value): raise ValueError("Not a valid URL: " + value) self._engine.api_url = value @property def async_checking(self): return self._async_checking @property def auto_reload_post_update(self): return self._auto_reload_post_update @auto_reload_post_update.setter def auto_reload_post_update(self, value): try: self._auto_reload_post_update = bool(value) except: raise ValueError("auto_reload_post_update must be a boolean value") @property def backup_current(self): return self._backup_current @backup_current.setter def backup_current(self, value): if value is None: self._backup_current = False else: self._backup_current = value @property def backup_ignore_patterns(self): return self._backup_ignore_patterns @backup_ignore_patterns.setter def backup_ignore_patterns(self, value): if value is None: self._backup_ignore_patterns = None elif not isinstance(value, list): raise ValueError("Backup pattern must be in list format") else: self._backup_ignore_patterns = value @property def check_interval(self): return (self._check_interval_enabled, self._check_interval_months, self._check_interval_days, self._check_interval_hours, self._check_interval_minutes) @property def current_version(self): return self._current_version @current_version.setter def current_version(self, tuple_values): if tuple_values is None: self._current_version = None return elif type(tuple_values) is not tuple: try: tuple(tuple_values) except: raise ValueError( "current_version must be a tuple of integers") for i in tuple_values: if type(i) is not int: raise ValueError( "current_version must be a tuple of integers") self._current_version = tuple(tuple_values) @property def engine(self): return self._engine.name @engine.setter def engine(self, value): engine = value.lower() if engine == "github": self._engine = GithubEngine() elif engine == "gitlab": self._engine = GitlabEngine() elif engine == "bitbucket": self._engine = BitbucketEngine() elif engine == "forgejo": self._engine = ForgejoEngine() else: raise ValueError("Invalid engine selection") @property def error(self): return self._error @property def error_msg(self): return self._error_msg @property def fake_install(self): return self._fake_install @fake_install.setter def fake_install(self, value): if not isinstance(value, bool): raise ValueError("fake_install must be a boolean value") self._fake_install = bool(value) # not currently used @property def include_branch_auto_check(self): return self._include_branch_auto_check @include_branch_auto_check.setter def include_branch_auto_check(self, value): try: self._include_branch_auto_check = bool(value) except: raise ValueError("include_branch_autocheck must be a boolean") @property def include_branch_list(self): return self._include_branch_list @include_branch_list.setter def include_branch_list(self, value): try: if value is None: self._include_branch_list = ['master'] elif not isinstance(value, list) or len(value) == 0: raise ValueError( "include_branch_list should be a list of valid branches") else: self._include_branch_list = value except: raise ValueError( "include_branch_list should be a list of valid branches") @property def include_branches(self): return self._include_branches @include_branches.setter def include_branches(self, value): try: self._include_branches = bool(value) except: raise ValueError("include_branches must be a boolean value") @property def json(self): if len(self._json) == 0: self.set_updater_json() return self._json @property def latest_release(self): if self._latest_release is None: return None return self._latest_release @property def manual_only(self): return self._manual_only @manual_only.setter def manual_only(self, value): try: self._manual_only = bool(value) except: raise ValueError("manual_only must be a boolean value") @property def overwrite_patterns(self): return self._overwrite_patterns @overwrite_patterns.setter def overwrite_patterns(self, value): if value is None: self._overwrite_patterns = ["*.py", "*.pyc"] elif not isinstance(value, list): raise ValueError("overwrite_patterns needs to be in a list format") else: self._overwrite_patterns = value @property def private_token(self): return self._engine.token @private_token.setter def private_token(self, value): if value is None: self._engine.token = None else: self._engine.token = str(value) @property def remove_pre_update_patterns(self): return self._remove_pre_update_patterns @remove_pre_update_patterns.setter def remove_pre_update_patterns(self, value): if value is None: self._remove_pre_update_patterns = list() elif not isinstance(value, list): raise ValueError( "remove_pre_update_patterns needs to be in a list format") else: self._remove_pre_update_patterns = value @property def repo(self): return self._repo @repo.setter def repo(self, value): try: self._repo = str(value) except: raise ValueError("repo must be a string value") @property def select_link(self): return self._select_link @select_link.setter def select_link(self, value): # ensure it is a function assignment, with signature: # input self, tag; returns link name if not hasattr(value, "__call__"): raise ValueError("select_link must be a function") self._select_link = value @property def stage_path(self): return self._updater_path @stage_path.setter def stage_path(self, value): if value is None: self.print_verbose("Aborting assigning stage_path, it's null") return elif value is not None and not os.path.exists(value): try: os.makedirs(value) except: self.print_verbose("Error trying to staging path") self.print_trace() return self._updater_path = value @property def subfolder_path(self): return self._subfolder_path @subfolder_path.setter def subfolder_path(self, value): self._subfolder_path = value @property def tags(self): if len(self._tags) == 0: return list() tag_names = list() for tag in self._tags: tag_names.append(tag["name"]) return tag_names @property def tag_latest(self): if self._tag_latest is None: return None return self._tag_latest["name"] @property def update_link(self): return self._update_link @property def update_ready(self): return self._update_ready @property def update_version(self): return self._update_version @property def use_releases(self): return self._use_releases @use_releases.setter def use_releases(self, value): try: self._use_releases = bool(value) except: raise ValueError("use_releases must be a boolean value") @property def user(self): return self._user @user.setter def user(self, value): try: self._user = str(value) except: raise ValueError("User must be a string value") @property def verbose(self): return self._verbose @verbose.setter def verbose(self, value): try: self._verbose = bool(value) self.print_verbose("Verbose is enabled") except: raise ValueError("Verbose must be a boolean value") @property def use_print_traces(self): return self._use_print_traces @use_print_traces.setter def use_print_traces(self, value): try: self._use_print_traces = bool(value) except: raise ValueError("use_print_traces must be a boolean value") @property def version_max_update(self): return self._version_max_update @version_max_update.setter def version_max_update(self, value): if value is None: self._version_max_update = None return if not isinstance(value, tuple): raise ValueError("Version maximum must be a tuple") for subvalue in value: if type(subvalue) is not int: raise ValueError("Version elements must be integers") self._version_max_update = value @property def version_min_update(self): return self._version_min_update @version_min_update.setter def version_min_update(self, value): if value is None: self._version_min_update = None return if not isinstance(value, tuple): raise ValueError("Version minimum must be a tuple") for subvalue in value: if type(subvalue) != int: raise ValueError("Version elements must be integers") self._version_min_update = value @property def website(self): return self._website @website.setter def website(self, value): if not self.check_is_url(value): raise ValueError("Not a valid URL: " + value) self._website = value # ------------------------------------------------------------------------- # Parameter validation related functions # ------------------------------------------------------------------------- @staticmethod def check_is_url(url): if not ("http://" in url or "https://" in url): return False if "." not in url: return False return True def _get_tag_names(self): tag_names = list() self.get_tags() for tag in self._tags: tag_names.append(tag["name"]) return tag_names def set_check_interval(self, enabled=False, months=0, days=14, hours=0, minutes=0): """Set the time interval between automated checks, and if enabled. Has enabled = False as default to not check against frequency, if enabled, default is 2 weeks. """ if type(enabled) is not bool: raise ValueError("Enable must be a boolean value") if type(months) is not int: raise ValueError("Months must be an integer value") if type(days) is not int: raise ValueError("Days must be an integer value") if type(hours) is not int: raise ValueError("Hours must be an integer value") if type(minutes) is not int: raise ValueError("Minutes must be an integer value") if not enabled: self._check_interval_enabled = False else: self._check_interval_enabled = True self._check_interval_months = months self._check_interval_days = days self._check_interval_hours = hours self._check_interval_minutes = minutes def __repr__(self): return "".format(a=__file__) def __str__(self): return "Updater, with user: {a}, repository: {b}, url: {c}".format( a=self._user, b=self._repo, c=self.form_repo_url()) # ------------------------------------------------------------------------- # API-related functions # ------------------------------------------------------------------------- def form_repo_url(self): return self._engine.form_repo_url(self) def form_tags_url(self): return self._engine.form_tags_url(self) def form_branch_url(self, branch): return self._engine.form_branch_url(branch, self) def get_tags(self): request = self.form_tags_url() self.print_verbose("Getting tags from server") # get all tags, internet call all_tags = self._engine.parse_tags(self.get_api(request), self) if all_tags is not None: self._prefiltered_tag_count = len(all_tags) else: self._prefiltered_tag_count = 0 all_tags = list() # pre-process to skip tags if self.skip_tag is not None: self._tags = [tg for tg in all_tags if not self.skip_tag(self, tg)] else: self._tags = all_tags # get additional branches too, if needed, and place in front # Does NO checking here whether branch is valid if self._include_branches: temp_branches = self._include_branch_list.copy() temp_branches.reverse() for branch in temp_branches: request = self.form_branch_url(branch) include = { "name": branch.title(), "zipball_url": request } self._tags = [include] + self._tags # append to front if self._tags is None: # some error occurred self._tag_latest = None self._tags = list() elif self._prefiltered_tag_count == 0 and not self._include_branches: self._tag_latest = None if self._error is None: # if not None, could have had no internet self._error = "No releases found" self._error_msg = "No releases or tags found in repository" self.print_verbose("No releases or tags found in repository") elif self._prefiltered_tag_count == 0 and self._include_branches: if not self._error: self._tag_latest = self._tags[0] branch = self._include_branch_list[0] self.print_verbose("{} branch found, no releases: {}".format( branch, self._tags[0])) elif ((len(self._tags) - len(self._include_branch_list) == 0 and self._include_branches) or (len(self._tags) == 0 and not self._include_branches) and self._prefiltered_tag_count > 0): self._tag_latest = None self._error = "No releases available" self._error_msg = "No versions found within compatible version range" self.print_verbose(self._error_msg) else: if not self._include_branches: self._tag_latest = self._tags[0] self.print_verbose( "Most recent tag found:" + str(self._tags[0]['name'])) else: # Don't return branch if in list. n = len(self._include_branch_list) self._tag_latest = self._tags[n] # guaranteed at least len()=n+1 self.print_verbose( "Most recent tag found:" + str(self._tags[n]['name'])) def get_raw(self, url): """All API calls to base url.""" request = urllib.request.Request(url) try: context = ssl._create_unverified_context() except: # Some blender packaged python versions don't have this, largely # useful for local network setups otherwise minimal impact. context = None # Setup private request headers if appropriate. if self._engine.token is not None: if self._engine.name == "gitlab": request.add_header('PRIVATE-TOKEN', self._engine.token) else: self.print_verbose("Tokens not setup for engine yet") # Always set user agent. request.add_header( 'User-Agent', "Python/" + str(platform.python_version())) # Run the request. try: if context: result = urllib.request.urlopen(request, context=context) else: result = urllib.request.urlopen(request) except urllib.error.HTTPError as e: if str(e.code) == "403": self._error = "HTTP error (access denied)" self._error_msg = str(e.code) + " - server error response" print(self._error, self._error_msg) else: self._error = "HTTP error" self._error_msg = str(e.code) print(self._error, self._error_msg) self.print_trace() self._update_ready = None except urllib.error.URLError as e: reason = str(e.reason) if "TLSV1_ALERT" in reason or "SSL" in reason.upper(): self._error = "Connection rejected, download manually" self._error_msg = reason print(self._error, self._error_msg) else: self._error = "URL error, check internet connection" self._error_msg = reason print(self._error, self._error_msg) self.print_trace() self._update_ready = None return None else: result_string = result.read() result.close() return result_string.decode() def get_api(self, url): """Result of all api calls, decoded into json format.""" get = None get = self.get_raw(url) if get is not None: try: return json.JSONDecoder().decode(get) except Exception as e: self._error = "API response has invalid JSON format" self._error_msg = str(e.reason) self._update_ready = None print(self._error, self._error_msg) self.print_trace() return None else: return None def stage_repository(self, url): """Create a working directory and download the new files""" local = os.path.join(self._updater_path, "update_staging") error = None # Make/clear the staging folder, to ensure the folder is always clean. self.print_verbose( "Preparing staging folder for download:\n" + str(local)) if os.path.isdir(local): try: shutil.rmtree(local) os.makedirs(local) except: error = "failed to remove existing staging directory" self.print_trace() else: try: os.makedirs(local) except: error = "failed to create staging directory" self.print_trace() if error is not None: self.print_verbose("Error: Aborting update, " + error) self._error = "Update aborted, staging path error" self._error_msg = "Error: {}".format(error) return False if self._backup_current: self.create_backup() self.print_verbose("Now retrieving the new source zip") self._source_zip = os.path.join(local, "source.zip") self.print_verbose("Starting download update zip") try: request = urllib.request.Request(url) context = ssl._create_unverified_context() # Setup private token if appropriate. if self._engine.token is not None: if self._engine.name == "gitlab": request.add_header('PRIVATE-TOKEN', self._engine.token) else: self.print_verbose( "Tokens not setup for selected engine yet") # Always set user agent request.add_header( 'User-Agent', "Python/" + str(platform.python_version())) self.url_retrieve(urllib.request.urlopen(request, context=context), self._source_zip) # Add additional checks on file size being non-zero. self.print_verbose("Successfully downloaded update zip") return True except Exception as e: self._error = "Error retrieving download, bad link?" self._error_msg = "Error: {}".format(e) print("Error retrieving download, bad link?") print("Error: {}".format(e)) self.print_trace() return False def create_backup(self): """Save a backup of the current installed addon prior to an update.""" self.print_verbose("Backing up current addon folder") local = os.path.join(self._updater_path, "backup") tempdest = os.path.join( self._addon_root, os.pardir, self._addon + "_updater_backup_temp") self.print_verbose("Backup destination path: " + str(local)) if os.path.isdir(local): try: shutil.rmtree(local) except: self.print_verbose( "Failed to removed previous backup folder, continuing") self.print_trace() # Remove the temp folder. # Shouldn't exist but could if previously interrupted. if os.path.isdir(tempdest): try: shutil.rmtree(tempdest) except: self.print_verbose( "Failed to remove existing temp folder, continuing") self.print_trace() # Make a full addon copy, temporarily placed outside the addon folder. if self._backup_ignore_patterns is not None: try: shutil.copytree(self._addon_root, tempdest, ignore=shutil.ignore_patterns( *self._backup_ignore_patterns)) except: print("Failed to create backup, still attempting update.") self.print_trace() return else: try: shutil.copytree(self._addon_root, tempdest) except: print("Failed to create backup, still attempting update.") self.print_trace() return shutil.move(tempdest, local) # Save the date for future reference. now = datetime.now() self._json["backup_date"] = "{m}-{d}-{yr}".format( m=now.strftime("%B"), d=now.day, yr=now.year) self.save_updater_json() def restore_backup(self): """Restore the last backed up addon version, user initiated only""" self.print_verbose("Restoring backup, backing up current addon folder") backuploc = os.path.join(self._updater_path, "backup") tempdest = os.path.join( self._addon_root, os.pardir, self._addon + "_updater_backup_temp") tempdest = os.path.abspath(tempdest) # Move instead contents back in place, instead of copy. shutil.move(backuploc, tempdest) shutil.rmtree(self._addon_root) os.rename(tempdest, self._addon_root) self._json["backup_date"] = "" self._json["just_restored"] = True self._json["just_updated"] = True self.save_updater_json() self.reload_addon() def unpack_staged_zip(self, clean=False): """Unzip the downloaded file, and validate contents""" if not os.path.isfile(self._source_zip): self.print_verbose("Error, update zip not found") self._error = "Install failed" self._error_msg = "Downloaded zip not found" return -1 # Clear the existing source folder in case previous files remain. outdir = os.path.join(self._updater_path, "source") try: shutil.rmtree(outdir) self.print_verbose("Source folder cleared") except: self.print_trace() # Create parent directories if needed, would not be relevant unless # installing addon into another location or via an addon manager. try: os.mkdir(outdir) except Exception as err: print("Error occurred while making extract dir:") print(str(err)) self.print_trace() self._error = "Install failed" self._error_msg = "Failed to make extract directory" return -1 if not os.path.isdir(outdir): print("Failed to create source directory") self._error = "Install failed" self._error_msg = "Failed to create extract directory" return -1 self.print_verbose( "Begin extracting source from zip:" + str(self._source_zip)) with zipfile.ZipFile(self._source_zip, "r") as zfile: if not zfile: self._error = "Install failed" self._error_msg = "Resulting file is not a zip, cannot extract" self.print_verbose(self._error_msg) return -1 # Now extract directly from the first subfolder (not root) # this avoids adding the first subfolder to the path length, # which can be too long if the download has the SHA in the name. zsep = '/' # Not using os.sep, always the / value even on windows. for name in zfile.namelist(): if zsep not in name: continue top_folder = name[:name.index(zsep) + 1] if name == top_folder + zsep: continue # skip top level folder sub_path = name[name.index(zsep) + 1:] if name.endswith(zsep): try: os.mkdir(os.path.join(outdir, sub_path)) self.print_verbose( "Extract - mkdir: " + os.path.join(outdir, sub_path)) except OSError as exc: if exc.errno != errno.EEXIST: self._error = "Install failed" self._error_msg = "Could not create folder from zip" self.print_trace() return -1 else: with open(os.path.join(outdir, sub_path), "wb") as outfile: data = zfile.read(name) outfile.write(data) self.print_verbose( "Extract - create: " + os.path.join(outdir, sub_path)) self.print_verbose("Extracted source") unpath = os.path.join(self._updater_path, "source") if not os.path.isdir(unpath): self._error = "Install failed" self._error_msg = "Extracted path does not exist" print("Extracted path does not exist: ", unpath) return -1 if self._subfolder_path: self._subfolder_path.replace('/', os.path.sep) self._subfolder_path.replace('\\', os.path.sep) # Either directly in root of zip/one subfolder, or use specified path. if not os.path.isfile(os.path.join(unpath, "__init__.py")): dirlist = os.listdir(unpath) if len(dirlist) > 0: if self._subfolder_path == "" or self._subfolder_path is None: unpath = os.path.join(unpath, dirlist[0]) else: unpath = os.path.join(unpath, self._subfolder_path) # Smarter check for additional sub folders for a single folder # containing the __init__.py file. if not os.path.isfile(os.path.join(unpath, "__init__.py")): print("Not a valid addon found") print("Paths:") print(dirlist) self._error = "Install failed" self._error_msg = "No __init__ file found in new source" return -1 # Merge code with the addon directory, using blender default behavior, # plus any modifiers indicated by user (e.g. force remove/keep). self.deep_merge_directory(self._addon_root, unpath, clean) # Now save the json state. # Change to True to trigger the handler on other side if allowing # reloading within same blender session. self._json["just_updated"] = True self.save_updater_json() self.reload_addon() self._update_ready = False return 0 def deep_merge_directory(self, base, merger, clean=False): """Merge folder 'merger' into 'base' without deleting existing""" if not os.path.exists(base): self.print_verbose("Base path does not exist:" + str(base)) return -1 elif not os.path.exists(merger): self.print_verbose("Merger path does not exist") return -1 # Path to be aware of and not overwrite/remove/etc. staging_path = os.path.join(self._updater_path, "update_staging") # If clean install is enabled, clear existing files ahead of time # note: will not delete the update.json, update folder, staging, or # staging but will delete all other folders/files in addon directory. error = None if clean: try: # Implement clearing of all folders/files, except the updater # folder and updater json. # Careful, this deletes entire subdirectories recursively... # Make sure that base is not a high level shared folder, but # is dedicated just to the addon itself. self.print_verbose( "clean=True, clearing addon folder to fresh install state") # Remove root files and folders (except update folder). files = [f for f in os.listdir(base) if os.path.isfile(os.path.join(base, f))] folders = [f for f in os.listdir(base) if os.path.isdir(os.path.join(base, f))] for f in files: os.remove(os.path.join(base, f)) self.print_verbose( "Clean removing file {}".format(os.path.join(base, f))) for f in folders: if os.path.join(base, f) is self._updater_path: continue shutil.rmtree(os.path.join(base, f)) self.print_verbose( "Clean removing folder and contents {}".format( os.path.join(base, f))) except Exception as err: error = "failed to create clean existing addon folder" print(error, str(err)) self.print_trace() # Walk through the base addon folder for rules on pre-removing # but avoid removing/altering backup and updater file. for path, dirs, files in os.walk(base): # Prune ie skip updater folder. dirs[:] = [d for d in dirs if os.path.join(path, d) not in [self._updater_path]] for file in files: for pattern in self.remove_pre_update_patterns: if fnmatch.filter([file], pattern): try: fl = os.path.join(path, file) os.remove(fl) self.print_verbose("Pre-removed file " + file) except OSError: print("Failed to pre-remove " + file) self.print_trace() # Walk through the temp addon sub folder for replacements # this implements the overwrite rules, which apply after # the above pre-removal rules. This also performs the # actual file copying/replacements. for path, dirs, files in os.walk(merger): # Verify structure works to prune updater sub folder overwriting. dirs[:] = [d for d in dirs if os.path.join(path, d) not in [self._updater_path]] rel_path = os.path.relpath(path, merger) dest_path = os.path.join(base, rel_path) if not os.path.exists(dest_path): os.makedirs(dest_path) for file in files: # Bring in additional logic around copying/replacing. # Blender default: overwrite .py's, don't overwrite the rest. dest_file = os.path.join(dest_path, file) srcFile = os.path.join(path, file) # Decide to replace if file already exists, and copy new over. if os.path.isfile(dest_file): # Otherwise, check each file for overwrite pattern match. replaced = False for pattern in self._overwrite_patterns: if fnmatch.filter([file], pattern): replaced = True break if replaced: os.remove(dest_file) os.rename(srcFile, dest_file) self.print_verbose( "Overwrote file " + os.path.basename(dest_file)) else: self.print_verbose( "Pattern not matched to {}, not overwritten".format( os.path.basename(dest_file))) else: # File did not previously exist, simply move it over. os.rename(srcFile, dest_file) self.print_verbose( "New file " + os.path.basename(dest_file)) # now remove the temp staging folder and downloaded zip try: shutil.rmtree(staging_path) except: error = ("Error: Failed to remove existing staging directory, " "consider manually removing ") + staging_path self.print_verbose(error) self.print_trace() def reload_addon(self): # if post_update false, skip this function # else, unload/reload addon & trigger popup if not self._auto_reload_post_update: print("Restart blender to reload addon and complete update") return self.print_verbose("Reloading addon...") addon_utils.modules(refresh=True) bpy.utils.refresh_script_paths() # not allowed in restricted context, such as register module # toggle to refresh if "addon_disable" in dir(bpy.ops.wm): # 2.7 bpy.ops.wm.addon_disable(module=self._addon_package) bpy.ops.wm.addon_refresh() bpy.ops.wm.addon_enable(module=self._addon_package) print("2.7 reload complete") else: # 2.8 bpy.ops.preferences.addon_disable(module=self._addon_package) bpy.ops.preferences.addon_refresh() bpy.ops.preferences.addon_enable(module=self._addon_package) print("2.8 reload complete") # ------------------------------------------------------------------------- # Other non-api functions and setups # ------------------------------------------------------------------------- def clear_state(self): self._update_ready = None self._update_link = None self._update_version = None self._source_zip = None self._error = None self._error_msg = None def url_retrieve(self, url_file, filepath): """Custom urlretrieve implementation""" chunk = 1024 * 8 f = open(filepath, "wb") while 1: data = url_file.read(chunk) if not data: # print("done.") break f.write(data) # print("Read %s bytes" % len(data)) f.close() def version_tuple_from_text(self, text): """Convert text into a tuple of numbers (int). Should go through string and remove all non-integers, and for any given break split into a different section. """ if text is None: return () segments = list() tmp = '' for char in str(text): if not char.isdigit(): if len(tmp) > 0: segments.append(int(tmp)) tmp = '' else: tmp += char if len(tmp) > 0: segments.append(int(tmp)) if len(segments) == 0: self.print_verbose("No version strings found text: " + str(text)) if not self._include_branches: return () else: return (text) return tuple(segments) def check_for_update_async(self, callback=None): """Called for running check in a background thread""" is_ready = ( self._json is not None and "update_ready" in self._json and self._json["version_text"] != dict() and self._json["update_ready"]) if is_ready: self._update_ready = True self._update_link = self._json["version_text"]["link"] self._update_version = str(self._json["version_text"]["version"]) # Cached update. callback(True) return # do the check if not self._check_interval_enabled: return elif self._async_checking: self.print_verbose("Skipping async check, already started") # already running the bg thread elif self._update_ready is None: print("{} updater: Running background check for update".format( self.addon)) self.start_async_check_update(False, callback) def check_for_update_now(self, callback=None): self._error = None self._error_msg = None self.print_verbose( "Check update pressed, first getting current status") if self._async_checking: self.print_verbose("Skipping async check, already started") return # already running the bg thread elif self._update_ready is None: self.start_async_check_update(True, callback) else: self._update_ready = None self.start_async_check_update(True, callback) def check_for_update(self, now=False): """Check for update not in a syncrhonous manner. This function is not async, will always return in sequential fashion but should have a parent which calls it in another thread. """ self.print_verbose("Checking for update function") # clear the errors if any self._error = None self._error_msg = None # avoid running again in, just return past result if found # but if force now check, then still do it if self._update_ready is not None and not now: return (self._update_ready, self._update_version, self._update_link) if self._current_version is None: raise ValueError("current_version not yet defined") if self._repo is None: raise ValueError("repo not yet defined") if self._user is None: raise ValueError("username not yet defined") self.set_updater_json() # self._json if not now and not self.past_interval_timestamp(): self.print_verbose( "Aborting check for updated, check interval not reached") return (False, None, None) # check if using tags or releases # note that if called the first time, this will pull tags from online if self._fake_install: self.print_verbose( "fake_install = True, setting fake version as ready") self._update_ready = True self._update_version = "(999,999,999)" self._update_link = "http://127.0.0.1" return (self._update_ready, self._update_version, self._update_link) # Primary internet call, sets self._tags and self._tag_latest. self.get_tags() self._json["last_check"] = str(datetime.now()) self.save_updater_json() # Can be () or ('master') in addition to branches, and version tag. new_version = self.version_tuple_from_text(self.tag_latest) if len(self._tags) == 0: self._update_ready = False self._update_version = None self._update_link = None return (False, None, None) if not self._include_branches: link = self.select_link(self, self._tags[0]) else: n = len(self._include_branch_list) if len(self._tags) == n: # effectively means no tags found on repo # so provide the first one as default link = self.select_link(self, self._tags[0]) else: link = self.select_link(self, self._tags[n]) if new_version == (): self._update_ready = False self._update_version = None self._update_link = None return (False, None, None) elif str(new_version).lower() in self._include_branch_list: # Handle situation where master/whichever branch is included # however, this code effectively is not triggered now # as new_version will only be tag names, not branch names. if not self._include_branch_auto_check: # Don't offer update as ready, but set the link for the # default branch for installing. self._update_ready = False self._update_version = new_version self._update_link = link self.save_updater_json() return (True, new_version, link) else: # Bypass releases and look at timestamp of last update from a # branch compared to now, see if commit values match or not. raise ValueError("include_branch_autocheck: NOT YET DEVELOPED") else: # Situation where branches not included. if new_version > self._current_version: self._update_ready = True self._update_version = new_version self._update_link = link self.save_updater_json() return (True, new_version, link) # If no update, set ready to False from None to show it was checked. self._update_ready = False self._update_version = None self._update_link = None return (False, None, None) def set_tag(self, name): """Assign the tag name and url to update to""" tg = None for tag in self._tags: if name == tag["name"]: tg = tag break if tg: new_version = self.version_tuple_from_text(self.tag_latest) self._update_version = new_version self._update_link = self.select_link(self, tg) elif self._include_branches and name in self._include_branch_list: # scenario if reverting to a specific branch name instead of tag tg = name link = self.form_branch_url(tg) self._update_version = name # this will break things self._update_link = link if not tg: raise ValueError("Version tag not found: " + name) def run_update(self, force=False, revert_tag=None, clean=False, callback=None): """Runs an install, update, or reversion of an addon from online source Arguments: force: Install assigned link, even if self.update_ready is False revert_tag: Version to install, if none uses detected update link clean: not used, but in future could use to totally refresh addon callback: used to run function on update completion """ self._json["update_ready"] = False self._json["ignore"] = False # clear ignore flag self._json["version_text"] = dict() if revert_tag is not None: self.set_tag(revert_tag) self._update_ready = True # clear the errors if any self._error = None self._error_msg = None self.print_verbose("Running update") if self._fake_install: # Change to True, to trigger the reload/"update installed" handler. self.print_verbose("fake_install=True") self.print_verbose( "Just reloading and running any handler triggers") self._json["just_updated"] = True self.save_updater_json() if self._backup_current is True: self.create_backup() self.reload_addon() self._update_ready = False res = True # fake "success" zip download flag elif not force: if not self._update_ready: self.print_verbose("Update stopped, new version not ready") if callback: callback( self._addon_package, "Update stopped, new version not ready") return "Update stopped, new version not ready" elif self._update_link is None: # this shouldn't happen if update is ready self.print_verbose("Update stopped, update link unavailable") if callback: callback(self._addon_package, "Update stopped, update link unavailable") return "Update stopped, update link unavailable" if revert_tag is None: self.print_verbose("Staging update") else: self.print_verbose("Staging install") res = self.stage_repository(self._update_link) if not res: print("Error in staging repository: " + str(res)) if callback is not None: callback(self._addon_package, self._error_msg) return self._error_msg res = self.unpack_staged_zip(clean) if res < 0: if callback: callback(self._addon_package, self._error_msg) return res else: if self._update_link is None: self.print_verbose("Update stopped, could not get link") return "Update stopped, could not get link" self.print_verbose("Forcing update") res = self.stage_repository(self._update_link) if not res: print("Error in staging repository: " + str(res)) if callback: callback(self._addon_package, self._error_msg) return self._error_msg res = self.unpack_staged_zip(clean) if res < 0: return res # would need to compare against other versions held in tags # run the front-end's callback if provided if callback: callback(self._addon_package) # return something meaningful, 0 means it worked return 0 def past_interval_timestamp(self): if not self._check_interval_enabled: return True # ie this exact feature is disabled if "last_check" not in self._json or self._json["last_check"] == "": return True now = datetime.now() last_check = datetime.strptime( self._json["last_check"], "%Y-%m-%d %H:%M:%S.%f") offset = timedelta( days=self._check_interval_days + 30 * self._check_interval_months, hours=self._check_interval_hours, minutes=self._check_interval_minutes) delta = (now - offset) - last_check if delta.total_seconds() > 0: self.print_verbose("Time to check for updates!") return True self.print_verbose("Determined it's not yet time to check for updates") return False def get_json_path(self): """Returns the full path to the JSON state file used by this updater. Will also rename old file paths to addon-specific path if found. """ json_path = os.path.join( self._updater_path, "{}_updater_status.json".format(self._addon_package)) old_json_path = os.path.join(self._updater_path, "updater_status.json") # Rename old file if it exists. try: os.rename(old_json_path, json_path) except FileNotFoundError: pass except Exception as err: print("Other OS error occurred while trying to rename old JSON") print(err) self.print_trace() return json_path def set_updater_json(self): """Load or initialize JSON dictionary data for updater state""" if self._updater_path is None: raise ValueError("updater_path is not defined") elif not os.path.isdir(self._updater_path): os.makedirs(self._updater_path) jpath = self.get_json_path() if os.path.isfile(jpath): with open(jpath) as data_file: self._json = json.load(data_file) self.print_verbose("Read in JSON settings from file") else: self._json = { "last_check": "", "backup_date": "", "update_ready": False, "ignore": False, "just_restored": False, "just_updated": False, "version_text": dict() } self.save_updater_json() def save_updater_json(self): """Trigger save of current json structure into file within addon""" if self._update_ready: if isinstance(self._update_version, tuple): self._json["update_ready"] = True self._json["version_text"]["link"] = self._update_link self._json["version_text"]["version"] = self._update_version else: self._json["update_ready"] = False self._json["version_text"] = dict() else: self._json["update_ready"] = False self._json["version_text"] = dict() jpath = self.get_json_path() if not os.path.isdir(os.path.dirname(jpath)): print("State error: Directory does not exist, cannot save json: ", os.path.basename(jpath)) return try: with open(jpath, 'w') as outf: data_out = json.dumps(self._json, indent=4) outf.write(data_out) except: print("Failed to open/save data to json: ", jpath) self.print_trace() self.print_verbose("Wrote out updater JSON settings with content:") self.print_verbose(str(self._json)) def json_reset_postupdate(self): self._json["just_updated"] = False self._json["update_ready"] = False self._json["version_text"] = dict() self.save_updater_json() def json_reset_restore(self): self._json["just_restored"] = False self._json["update_ready"] = False self._json["version_text"] = dict() self.save_updater_json() self._update_ready = None # Reset so you could check update again. def ignore_update(self): self._json["ignore"] = True self.save_updater_json() # ------------------------------------------------------------------------- # ASYNC related methods # ------------------------------------------------------------------------- def start_async_check_update(self, now=False, callback=None): """Start a background thread which will check for updates""" if self._async_checking: return self.print_verbose("Starting background checking thread") check_thread = threading.Thread(target=self.async_check_update, args=(now, callback,)) check_thread.daemon = True self._check_thread = check_thread check_thread.start() def async_check_update(self, now, callback=None): """Perform update check, run as target of background thread""" self._async_checking = True self.print_verbose("Checking for update now in background") try: self.check_for_update(now=now) except Exception as exception: print("Checking for update error:") print(exception) self.print_trace() if not self._error: self._update_ready = False self._update_version = None self._update_link = None self._error = "Error occurred" self._error_msg = "Encountered an error while checking for updates" self._async_checking = False self._check_thread = None if callback: self.print_verbose("Finished check update, doing callback") callback(self._update_ready) self.print_verbose("BG thread: Finished check update, no callback") def stop_async_check_update(self): """Method to give impression of stopping check for update. Currently does nothing but allows user to retry/stop blocking UI from hitting a refresh button. This does not actually stop the thread, as it will complete after the connection timeout regardless. If the thread does complete with a successful response, this will be still displayed on next UI refresh (ie no update, or update available). """ if self._check_thread is not None: self.print_verbose("Thread will end in normal course.") # however, "There is no direct kill method on a thread object." # better to let it run its course # self._check_thread.stop() self._async_checking = False self._error = None self._error_msg = None # ----------------------------------------------------------------------------- # Updater Engines # ----------------------------------------------------------------------------- class BitbucketEngine: """Integration to Bitbucket API for git-formatted repositories""" def __init__(self): self.api_url = 'https://api.bitbucket.org' self.token = None self.name = "bitbucket" def form_repo_url(self, updater): return "{}/2.0/repositories/{}/{}".format( self.api_url, updater.user, updater.repo) def form_tags_url(self, updater): return self.form_repo_url(updater) + "/refs/tags?sort=-name" def form_branch_url(self, branch, updater): return self.get_zip_url(branch, updater) def get_zip_url(self, name, updater): return "https://bitbucket.org/{user}/{repo}/get/{name}.zip".format( user=updater.user, repo=updater.repo, name=name) def parse_tags(self, response, updater): if response is None: return list() return [ { "name": tag["name"], "zipball_url": self.get_zip_url(tag["name"], updater) } for tag in response["values"]] class GithubEngine: """Integration to Github API""" def __init__(self): self.api_url = 'https://api.github.com' self.token = None self.name = "github" def form_repo_url(self, updater): return "{}/repos/{}/{}".format( self.api_url, updater.user, updater.repo) def form_tags_url(self, updater): if updater.use_releases: return "{}/releases".format(self.form_repo_url(updater)) else: return "{}/tags".format(self.form_repo_url(updater)) def form_branch_list_url(self, updater): return "{}/branches".format(self.form_repo_url(updater)) def form_branch_url(self, branch, updater): return "{}/zipball/{}".format(self.form_repo_url(updater), branch) def parse_tags(self, response, updater): if response is None: return list() return response class GitlabEngine: """Integration to GitLab API""" def __init__(self): self.api_url = 'https://gitlab.com' self.token = None self.name = "gitlab" def form_repo_url(self, updater): return "{}/api/v4/projects/{}".format(self.api_url, updater.repo) def form_tags_url(self, updater): return "{}/repository/tags".format(self.form_repo_url(updater)) def form_branch_list_url(self, updater): # does not validate branch name. return "{}/repository/branches".format( self.form_repo_url(updater)) def form_branch_url(self, branch, updater): # Could clash with tag names and if it does, it will download TAG zip # instead of branch zip to get direct path, would need. return "{}/repository/archive.zip?sha={}".format( self.form_repo_url(updater), branch) def get_zip_url(self, sha, updater): return "{base}/repository/archive.zip?sha={sha}".format( base=self.form_repo_url(updater), sha=sha) # def get_commit_zip(self, id, updater): # return self.form_repo_url(updater)+"/repository/archive.zip?sha:"+id def parse_tags(self, response, updater): if response is None: return list() return [ { "name": tag["name"], "zipball_url": self.get_zip_url(tag["commit"]["id"], updater) } for tag in response] class ForgejoEngine: """Integration to Forgejo/Gitea API""" def __init__(self): self.api_url = 'https://git.pointer.click' self.token = None self.name = "forgejo" def form_repo_url(self, updater): return "{}/api/v1/repos/{}/{}".format(self.api_url, updater.user, updater.repo) def form_tags_url(self, updater): return "{}/tags".format(self.form_repo_url(updater)) def form_branch_list_url(self, updater): # does not validate branch name. return "{}/branches".format( self.form_repo_url(updater)) def form_branch_url(self, branch, updater): # Could clash with tag names and if it does, it will download TAG zip # instead of branch zip to get direct path, would need. return "{}/archive/{}.zip".format( self.form_repo_url(updater), branch) def get_zip_url(self, sha, updater): return "{base}/archive/{sha}.zip".format( base=self.form_repo_url(updater), sha=sha) # def get_commit_zip(self, id, updater): # return self.form_repo_url(updater)+"/repository/archive.zip?sha:"+id def parse_tags(self, response, updater): if response is None: return list() return [ { "name": tag["name"], "zipball_url": self.get_zip_url(tag["commit"]["sha"], updater) } for tag in response] # ----------------------------------------------------------------------------- # The module-shared class instance, # should be what's imported to other files # ----------------------------------------------------------------------------- Updater = SingletonUpdater()