Skip to content

ota中有tar.gz压缩包升级方式,请问这个压缩包是如何制作的 #2

@Connor-WangK

Description

@Connor-WangK
class FileDecode(object):

    def __init__(self, zip_file, parent_dir="/fota/usr/"):
        self.data = b''
        self.fp = open(zip_file, "rb")
        self.fileData = None
        self.parent_dir = parent_dir
        self.update_file_list = []

    def get_update_files(self):
        return self.update_file_list

    def unzip(self):
        """缓存到内存中"""
        self.fp.seek(10)
        self.fileData = uzlib.DecompIO(self.fp, -15, 1)

    @classmethod
    def _ascii_trip(cls, data):
        return data.decode('ascii').rstrip('\0')

    @classmethod
    def file_size(cls, data):
        """获取真实size数据"""
        size = cls._ascii_trip(data)
        if not len(size):
            return 0
        return int(size, 8)

    @classmethod
    def get_file_name(cls, file_name):
        """获取文件名称"""
        return cls._ascii_trip(file_name)

    def get_data(self):
        return self.fileData.read(0x200)

    def unpack(self):
        try:
            folder_list = set()
            self.data = self.get_data()
            while True:
                if not self.data:
                    break
                size = self.file_size(self.data[124:135])
                file_name = self.get_file_name(self.data[:100])
                full_file_name = self.parent_dir + file_name

                if not size:
                    if len(full_file_name):
                        ql_fs.mkdirs(full_file_name)
                        if full_file_name not in folder_list and full_file_name != self.parent_dir:
                            folder_list.add(full_file_name)
                    else:
                        return
                    self.data = self.get_data()
                else:
                    self.data = self.get_data()
                    update_file = open(full_file_name, "wb+")
                    total_size = size
                    while True:
                        size -= 0x200
                        if size <= 0:
                            update_file.write(self.data[:size + 512])
                            break
                        else:
                            update_file.write(self.data)
                        self.data = self.get_data()
                    self.data = self.get_data()
                    update_file.close()
                    self.update_file_list.append({"file_name": file_name, "size": total_size})
        except Exception as e:
            self.fp.close()
            return False
        else:
            self.fp.close()
            return True


class AppFota(object):

    def __init__(self):
        self.fota = BaseAppFota.new()

    def set_update_flag(self):
        """设置升级标志(当且仅当升级文件下载成功后,且设置了升级标志,重启后才会触发升级,否则不升级。)"""
        self.fota.set_update_flag()

    def download(self, url, file_name):
        """下载单一升级文件

        @url: 待下载文件的url,类型为str。
        @file_name: 本地待升级文件的绝对路径,类型str。
        """
        if self.fota.download(url, file_name) == 0:
            return True
        return False

    def bulk_download(self, info):
        """批量下载升级文件

        @info: 批量下载列表,列表的元素均为包含了url和file_name的字典,类型为list
        """
        return self.fota.bulk_download(info)

    @staticmethod
    def __download_file_from_server(url, path):
        response = request.get(url)
        if response.status_code not in (200, 206):
            return False
        with open(path, 'wb') as f:
            for c in response.content:
                f.write(c)

    @staticmethod
    def __decode_file_to_updater_dir(path, updater_dir):
        fd = FileDecode(path, parent_dir=updater_dir)
        ql_fs.mkdirs(updater_dir)
        fd.unzip()
        if fd.unpack():
            for file in fd.update_file_list:
                update_download_stat('', '/usr/' + file['file_name'], file['size'])
            return True
        else:
            return False

    def download_tar(self, url, path="/usr/temp.tar.gz"):
        """通过压缩文件下载升级。"""
        self.__download_file_from_server(url, path)
        if self.__decode_file_to_updater_dir(
            path,
            self.fota.app_fota_pkg_mount.fota_dir + '/usr/.updater/usr/'
        ):
            uos.remove(path)
            return True
        else:
            return False

这个解压缩函数,对应的压缩包是如何制作的

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions