363 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			363 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
import os
 | 
						||
import shutil
 | 
						||
import zipfile
 | 
						||
import tarfile
 | 
						||
import gzip
 | 
						||
import fnmatch
 | 
						||
from typing import Union, List, Optional
 | 
						||
from pathlib import Path
 | 
						||
 | 
						||
 | 
						||
class FileUtils:
 | 
						||
    """
 | 
						||
    文件操作工具类
 | 
						||
 | 
						||
    功能:
 | 
						||
    1. 文件/文件夹拷贝
 | 
						||
    2. 文件/文件夹删除
 | 
						||
    3. 文件/文件夹压缩 (zip, tar, gz)
 | 
						||
    4. 文件/文件夹解压
 | 
						||
    5. 文件查找
 | 
						||
    6. 文件校验
 | 
						||
    """
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def copy(src: Union[str, Path], dst: Union[str, Path],
 | 
						||
             overwrite: bool = False, ignore_patterns: Optional[List[str]] = None) -> bool:
 | 
						||
        """
 | 
						||
        拷贝文件或文件夹
 | 
						||
 | 
						||
        :param src: 源路径
 | 
						||
        :param dst: 目标路径
 | 
						||
        :param overwrite: 是否覆盖已存在文件
 | 
						||
        :param ignore_patterns: 忽略的文件模式列表 (如 ['*.tmp', '*.log'])
 | 
						||
        :return: 是否成功
 | 
						||
        """
 | 
						||
        src, dst = Path(src), Path(dst)
 | 
						||
 | 
						||
        def _ignore(path, names):
 | 
						||
            ignored = set()
 | 
						||
            if ignore_patterns:
 | 
						||
                for pattern in ignore_patterns:
 | 
						||
                    ignored.update(fnmatch.filter(names, pattern))
 | 
						||
            return ignored
 | 
						||
 | 
						||
        try:
 | 
						||
            if src.is_file():
 | 
						||
                if dst.exists():
 | 
						||
                    if not overwrite:
 | 
						||
                        return False
 | 
						||
                    if dst.is_dir():
 | 
						||
                        dst = dst / src.name
 | 
						||
                shutil.copy2(src, dst)
 | 
						||
            elif src.is_dir():
 | 
						||
                if dst.exists() and not overwrite:
 | 
						||
                    return False
 | 
						||
                shutil.copytree(src, dst, ignore=_ignore if ignore_patterns else None,
 | 
						||
                                dirs_exist_ok=overwrite)
 | 
						||
            return True
 | 
						||
        except Exception as e:
 | 
						||
            print(f"拷贝失败: {e}")
 | 
						||
            return False
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def delete(path: Union[str, Path], recursive: bool = False) -> bool:
 | 
						||
        """
 | 
						||
        删除文件或文件夹
 | 
						||
 | 
						||
        :param path: 要删除的路径
 | 
						||
        :param recursive: 是否递归删除文件夹
 | 
						||
        :return: 是否成功
 | 
						||
        """
 | 
						||
        path = Path(path)
 | 
						||
        try:
 | 
						||
            if path.is_file():
 | 
						||
                path.unlink()
 | 
						||
            elif path.is_dir():
 | 
						||
                if recursive:
 | 
						||
                    shutil.rmtree(path)
 | 
						||
                else:
 | 
						||
                    path.rmdir()
 | 
						||
            return True
 | 
						||
        except Exception as e:
 | 
						||
            print(f"删除失败: {e}")
 | 
						||
            return False
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def compress(
 | 
						||
            src: Union[str, Path, List[Union[str, Path]]],
 | 
						||
            dst: Union[str, Path],
 | 
						||
            fmt: str = 'zip',
 | 
						||
            compression_level: int = 6
 | 
						||
    ) -> bool:
 | 
						||
        """
 | 
						||
        压缩文件或文件夹
 | 
						||
 | 
						||
        :param src: 源路径(单个或多个)
 | 
						||
        :param dst: 目标压缩文件路径
 | 
						||
        :param fmt: 压缩格式 (zip, tar, gz)
 | 
						||
        :param compression_level: 压缩级别 (1-9)
 | 
						||
        :return: 是否成功
 | 
						||
        """
 | 
						||
        src_list = [src] if not isinstance(src, list) else src
 | 
						||
        src_list = [Path(s) for s in src_list]
 | 
						||
        dst = Path(dst)
 | 
						||
 | 
						||
        try:
 | 
						||
            if fmt == 'zip':
 | 
						||
                with zipfile.ZipFile(dst, 'w', zipfile.ZIP_DEFLATED, compresslevel=compression_level) as zf:
 | 
						||
                    for src_item in src_list:
 | 
						||
                        if src_item.is_file():
 | 
						||
                            zf.write(src_item, src_item.name)
 | 
						||
                        elif src_item.is_dir():
 | 
						||
                            for root, _, files in os.walk(src_item):
 | 
						||
                                for file in files:
 | 
						||
                                    file_path = Path(root) / file
 | 
						||
                                    arcname = file_path.relative_to(src_item.parent)
 | 
						||
                                    zf.write(file_path, arcname)
 | 
						||
            elif fmt == 'tar':
 | 
						||
                with tarfile.open(dst, 'w:gz') as tf:
 | 
						||
                    for src_item in src_list:
 | 
						||
                        if src_item.is_file():
 | 
						||
                            tf.add(src_item, arcname=src_item.name)
 | 
						||
                        elif src_item.is_dir():
 | 
						||
                            tf.add(src_item, arcname=src_item.name)
 | 
						||
            elif fmt == 'gz':
 | 
						||
                if len(src_list) > 1:
 | 
						||
                    raise ValueError("gz格式只支持压缩单个文件")
 | 
						||
                with open(src_list[0], 'rb') as f_in:
 | 
						||
                    with gzip.open(dst, 'wb', compresslevel=compression_level) as f_out:
 | 
						||
                        shutil.copyfileobj(f_in, f_out)
 | 
						||
            else:
 | 
						||
                raise ValueError(f"不支持的压缩格式: {fmt}")
 | 
						||
            return True
 | 
						||
        except Exception as e:
 | 
						||
            print(f"压缩失败: {e}")
 | 
						||
            return False
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def decompress(
 | 
						||
            src: Union[str, Path],
 | 
						||
            dst: Union[str, Path] = None,
 | 
						||
            fmt: str = None
 | 
						||
    ) -> bool:
 | 
						||
        """
 | 
						||
        解压文件
 | 
						||
 | 
						||
        :param src: 压缩文件路径
 | 
						||
        :param dst: 解压目标路径 (默认为当前目录)
 | 
						||
        :param fmt: 压缩格式 (自动检测如果为None)
 | 
						||
        :return: 是否成功
 | 
						||
        """
 | 
						||
        src = Path(src)
 | 
						||
        dst = Path(dst) if dst else Path.cwd()
 | 
						||
 | 
						||
        # 自动检测格式
 | 
						||
        if fmt is None:
 | 
						||
            if src.suffix == '.zip':
 | 
						||
                fmt = 'zip'
 | 
						||
            elif src.suffix == '.tar' or src.suffixes[-2:] == ['.tar', '.gz']:
 | 
						||
                fmt = 'tar'
 | 
						||
            elif src.suffix == '.gz':
 | 
						||
                fmt = 'gz'
 | 
						||
            else:
 | 
						||
                raise ValueError("无法自动识别压缩格式,请指定fmt参数")
 | 
						||
 | 
						||
        try:
 | 
						||
            dst.mkdir(parents=True, exist_ok=True)
 | 
						||
 | 
						||
            if fmt == 'zip':
 | 
						||
                with zipfile.ZipFile(src, 'r') as zf:
 | 
						||
                    zf.extractall(dst)
 | 
						||
            elif fmt == 'tar':
 | 
						||
                with tarfile.open(src, 'r:*') as tf:
 | 
						||
                    tf.extractall(dst)
 | 
						||
            elif fmt == 'gz':
 | 
						||
                with gzip.open(src, 'rb') as f_in:
 | 
						||
                    output_path = dst / src.stem
 | 
						||
                    with open(output_path, 'wb') as f_out:
 | 
						||
                        shutil.copyfileobj(f_in, f_out)
 | 
						||
            else:
 | 
						||
                raise ValueError(f"不支持的压缩格式: {fmt}")
 | 
						||
            return True
 | 
						||
        except Exception as e:
 | 
						||
            print(f"解压失败: {e}")
 | 
						||
            return False
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def find_files(
 | 
						||
            root: Union[str, Path],
 | 
						||
            pattern: str = '*',
 | 
						||
            recursive: bool = True
 | 
						||
    ) -> List[Path]:
 | 
						||
        """
 | 
						||
        查找文件
 | 
						||
 | 
						||
        :param root: 搜索根目录
 | 
						||
        :param pattern: 文件名模式 (如 '*.txt')
 | 
						||
        :param recursive: 是否递归搜索
 | 
						||
        :return: 匹配的文件路径列表
 | 
						||
        """
 | 
						||
        root = Path(root)
 | 
						||
        matches = []
 | 
						||
 | 
						||
        if recursive:
 | 
						||
            for path in root.rglob(pattern):
 | 
						||
                if path.is_file():
 | 
						||
                    matches.append(path)
 | 
						||
        else:
 | 
						||
            for path in root.glob(pattern):
 | 
						||
                if path.is_file():
 | 
						||
                    matches.append(path)
 | 
						||
 | 
						||
        return matches
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def calculate_size(path: Union[str, Path]) -> int:
 | 
						||
        """
 | 
						||
        计算文件或文件夹大小(字节)
 | 
						||
 | 
						||
        :param path: 路径
 | 
						||
        :return: 大小(字节)
 | 
						||
        """
 | 
						||
        path = Path(path)
 | 
						||
        if path.is_file():
 | 
						||
            return path.stat().st_size
 | 
						||
        elif path.is_dir():
 | 
						||
            return sum(f.stat().st_size for f in path.rglob('*') if f.is_file())
 | 
						||
        return 0
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def compare_files(
 | 
						||
            file1: Union[str, Path],
 | 
						||
            file2: Union[str, Path],
 | 
						||
            chunk_size: int = 8192
 | 
						||
    ) -> bool:
 | 
						||
        """
 | 
						||
        比较两个文件内容是否相同
 | 
						||
 | 
						||
        :param file1: 文件1路径
 | 
						||
        :param file2: 文件2路径
 | 
						||
        :param chunk_size: 读取块大小
 | 
						||
        :return: 是否相同
 | 
						||
        """
 | 
						||
        file1, file2 = Path(file1), Path(file2)
 | 
						||
 | 
						||
        if file1.stat().st_size != file2.stat().st_size:
 | 
						||
            return False
 | 
						||
 | 
						||
        with open(file1, 'rb') as f1, open(file2, 'rb') as f2:
 | 
						||
            while True:
 | 
						||
                b1 = f1.read(chunk_size)
 | 
						||
                b2 = f2.read(chunk_size)
 | 
						||
                if b1 != b2:
 | 
						||
                    return False
 | 
						||
                if not b1:
 | 
						||
                    return True
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def get_md5(file_path: Union[str, Path], chunk_size: int = 8192) -> str:
 | 
						||
        """
 | 
						||
        计算文件的MD5哈希值
 | 
						||
 | 
						||
        :param file_path: 文件路径
 | 
						||
        :param chunk_size: 读取块大小
 | 
						||
        :return: MD5哈希值
 | 
						||
        """
 | 
						||
        import hashlib
 | 
						||
        file_path = Path(file_path)
 | 
						||
        md5 = hashlib.md5()
 | 
						||
 | 
						||
        with open(file_path, 'rb') as f:
 | 
						||
            while chunk := f.read(chunk_size):
 | 
						||
                md5.update(chunk)
 | 
						||
        return md5.hexdigest()
 | 
						||
 | 
						||
    @staticmethod
 | 
						||
    def move(
 | 
						||
            src: Union[str, Path],
 | 
						||
            dst: Union[str, Path],
 | 
						||
            overwrite: bool = False,
 | 
						||
            ignore_patterns: Optional[List[str]] = None,
 | 
						||
            create_parents: bool = True
 | 
						||
    ) -> Optional[bool]:
 | 
						||
        """
 | 
						||
        移动文件或文件夹
 | 
						||
 | 
						||
        :param create_parents:
 | 
						||
        :param src: 源路径
 | 
						||
        :param dst: 目标路径
 | 
						||
        :param overwrite: 是否覆盖已存在文件
 | 
						||
        :param ignore_patterns: 忽略的文件模式列表 (如 ['*.tmp', '*.log'])
 | 
						||
        :return: 是否成功
 | 
						||
 | 
						||
        示例:
 | 
						||
        >>> FileUtils.move('a.txt', 'dir/b.txt')  # 移动并重命名文件
 | 
						||
        >>> FileUtils.move('dir1', 'dir2')       # 移动文件夹
 | 
						||
        """
 | 
						||
        src, dst = Path(src), Path(dst)
 | 
						||
 | 
						||
        try:
 | 
						||
            # 处理目标已存在的情况
 | 
						||
            if dst.exists():
 | 
						||
                if not overwrite:
 | 
						||
                    return False
 | 
						||
                FileUtils.delete(dst)  # 先删除目标
 | 
						||
 | 
						||
            if create_parents:
 | 
						||
                if src.is_file():
 | 
						||
                    dst.parent.mkdir(parents=True, exist_ok=True)
 | 
						||
                elif src.is_dir():
 | 
						||
                    dst.mkdir(parents=True, exist_ok=True)
 | 
						||
 | 
						||
            # 移动文件
 | 
						||
            if src.is_file():
 | 
						||
                shutil.move(str(src), str(dst))
 | 
						||
                return True
 | 
						||
 | 
						||
            # 移动文件夹(带忽略模式)
 | 
						||
            elif src.is_dir():
 | 
						||
                # 先拷贝再删除源目录
 | 
						||
                if FileUtils.copy(src, dst, overwrite, ignore_patterns):
 | 
						||
                    FileUtils.delete(src, recursive=True)
 | 
						||
                    return True
 | 
						||
                return False
 | 
						||
 | 
						||
        except Exception as e:
 | 
						||
            print(f"移动失败: {e}")
 | 
						||
            return False
 | 
						||
 | 
						||
 | 
						||
# 使用示例
 | 
						||
if __name__ == "__main__":
 | 
						||
    # 1. 拷贝示例
 | 
						||
    FileUtils.copy('source.txt', 'backup.txt')
 | 
						||
    FileUtils.copy('mydir', 'mydir_backup', ignore_patterns=['*.tmp'])
 | 
						||
 | 
						||
    # 2. 删除示例
 | 
						||
    FileUtils.delete('backup.txt')
 | 
						||
    FileUtils.delete('mydir_backup', recursive=True)
 | 
						||
 | 
						||
    # 3. 压缩示例
 | 
						||
    FileUtils.compress('mydir', 'mydir.zip')
 | 
						||
    FileUtils.compress(['file1.txt', 'file2.txt'], 'files.tar', fmt='tar')
 | 
						||
 | 
						||
    # 4. 解压示例
 | 
						||
    FileUtils.decompress('mydir.zip', 'extracted')
 | 
						||
 | 
						||
    # 5. 查找文件示例
 | 
						||
    txt_files = FileUtils.find_files('.', '*.txt')
 | 
						||
    print(f"找到的文本文件: {txt_files}")
 | 
						||
 | 
						||
    # 6. 计算大小示例
 | 
						||
    size = FileUtils.calculate_size('mydir')
 | 
						||
    print(f"文件夹大小: {size} 字节")
 | 
						||
 | 
						||
    # 7. 比较文件示例
 | 
						||
    same = FileUtils.compare_files('file1.txt', 'file2.txt')
 | 
						||
    print(f"文件是否相同: {same}")
 | 
						||
 | 
						||
    # 8. 计算MD5示例
 | 
						||
    md5 = FileUtils.get_md5('file1.txt')
 | 
						||
    print(f"文件MD5: {md5}")
 |