自动下载阿里云OSS桶文件

# -*- coding: utf-8 -*-
# Author: xxx
# Datetime: 2024-02-09 12:44:10
# Example: pip3 install --upgrade pip && pip3 install oss2 && python3 xxx.py

import os
import hashlib
import oss2
import requests
import sys

sys.setrecursionlimit(1000)


# 阿里云 OSS 访问密钥
access_key_id = 'you-access-id'
access_key_secret = 'you-access-secret'
# OSS 存储桶名称
bucket_name = 'you-bucket-name'
# OSS 存储桶所在区域
oss_region = 'oss-cn-hongkong'
# 本地目录路径
local_root_directory = '/home/oss_backup/'
# Telegram Bot API token 和 chat_id
telegram_token = "you-telegram-token"
chat_id = you-telegram-id

# 创建 OSS 访问对象
auth = oss2.Auth(access_key_id, access_key_secret)
bucket = oss2.Bucket(auth, f'http://{oss_region}.aliyuncs.com', bucket_name)

# 统计下载和跳过的文件数量
downloaded_files = 0
skipped_files = 0

def download_file(object_key, local_path):
    # 下载文件到本地目录
    global downloaded_files
    bucket.get_object_to_file(object_key, local_path)
    downloaded_files += 1


def create_local_directory(directory):
    # 在本地创建目录
    os.makedirs(directory, exist_ok=True)

def calculate_md5(file_path):
    # 计算文件的 MD5 值
    hasher = hashlib.md5()
    with open(file_path, 'rb') as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hasher.update(chunk)
    return hasher.hexdigest()

def send_telegram_message(message):
    # 发送 Telegram 消息
    url = f"https://api.telegram.org/bot{telegram_token}/sendMessage"
    params = {
        'chat_id': chat_id,
        'text': message
    }
    requests.post(url, params=params)


def download_object(prefix, local_directory, object_key):
    global skipped_files
    local_path = os.path.join(local_directory, object_key)
    if os.path.exists(local_path):
        local_md5 = calculate_md5(local_path).lower()
        oss_md5 = bucket.get_object_meta(object_key).etag.strip('"').lower()
        if local_md5 == oss_md5:
            print(f"File {local_path} already exists and MD5 matches, skipping.")
            skipped_files += 1
            #continue
        else:
            # 存在,但不是最新的,下载
            download_file(object_key, local_path)
            print(f"File {local_path} downloaded.")
    else:
        # 不存在则下载
        download_file(object_key, local_path)
        print(f"File {local_path} downloaded.")



def list_objects_in_directory(object_list, local_directory):
    for ojs in object_list:
        objects = bucket.list_objects(prefix=ojs).object_list
        for obj in objects:
            object_key=str(obj.key)
            if obj.key.endswith('/'):
                create_local_directory(os.path.join(local_directory, object_key))
            else:
                print(f"OSS File: {obj.key}")
                download_object(ojs, local_directory,object_key)


def traverse_bucket_objects(prefix, local_directory=local_root_directory):


    dir_list = []
    for obj in oss2.ObjectIterator(bucket, prefix=prefix):      
        object_key=str(obj.key)
        print(f"OSS File: {object_key}")
        if obj.key.endswith('/'):  # 如果是目录,则递归遍历目录并在本地创建目录
            create_local_directory(os.path.join(local_directory, object_key))
            dir_list.append(object_key)
        else:  # 如果是文件,则下载前先判断本地文件是否存在,如果存在则校验文件的一致性a
            download_object(prefix,local_directory,object_key)

    if len(dir_list) > 0:
            list_objects_in_directory(object_list=dir_list, local_directory=local_directory)



def main():
    traverse_bucket_objects('')
    message = f"Download finished. Downloaded {downloaded_files} files, skipped {skipped_files} files."
    send_telegram_message(message)

if __name__ == "__main__":
    main()

Last updated