Python阿里云余额TG报警

version>=python3.6

参考: https://www.alibabacloud.com/help/zh/bss-openapi/latest/api-bssopenapi-2017-12-14-overview?spm=a2c63.p38356.0.0.39c6745ffhTCg6

pip install alibabacloud-bssopenapi20171214

  • en: 国际

  • cn: 国内


from alibabacloud_bssopenapi20171214.client import Client as BssOpenApi20171214Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models



class Sample:
    def __init__(self):
        pass

    @staticmethod
    def create_client(
        access_key_id: str,
        access_key_secret: str,
        country: str,
    ) -> BssOpenApi20171214Client:


        config = open_api_models.Config(
            # Required, your AccessKey ID,
            access_key_id=access_key_id,
            # Required, your AccessKey secret,
            access_key_secret=access_key_secret
        )
        # See https://api.alibabacloud.com/product/BssOpenApi.
        if country == 'EN':
            config.endpoint = f'business.ap-southeast-1.aliyuncs.com'
            return BssOpenApi20171214Client(config)
        elif country == 'CN':
            config.endpoint = f'business.aliyuncs.com'
            return BssOpenApi20171214Client(config)

    @staticmethod
    def main(
        keyid:str,
        keysecret:str,
        country:str,
    ) -> None:
        client = Sample.create_client(keyid,keysecret,country)
        runtime = util_models.RuntimeOptions()
        try:
            response = client.query_account_balance_with_options(runtime)
            print(response)
            cash = response.body.data.available_amount

            if cash:
                cash = cash.replace(',','')
                Currency = response.body.data.currency
                return(f'当前余额为:{cash} {Currency}')
        except Exception as error:
            print(error)

def notice(data):
    import httpx
    notify_text=''
    for i in data:
        notify_text = f'''{notify_text}{i['root']}\n服务商:{i['type']}\n用途:{i['name']}\n商家:{i['group']}\n{i['msg']}\n\n'''
    print(notify_text)
    notify_url = "https://api.telegram.org/bot6277743361:xxxxxxxx/sendMessage"
    notify_data = {"chat_id": -123, "text": notify_text, "parse_mode": "Markdown"}
    #推送
    #httpx.post(notify_url, json=notify_data)
if __name__ == '__main__':
    import json
    #read json file
    try:
        with open('ali.json', 'r', encoding='UTF-8') as f:
            data = json.load(f)['data']
            notice_list = []
            for i in data:
                name = i['name']
                country = i['country']
                group = i['group']
                type = i['type']
                root = i['账号']
                keyid = i['keyid']
                keysecret = i['keysecret']
                if type == '阿里云':
                    res = Sample.main(keyid=keyid, keysecret=keysecret, country=country)
                    if res:
                        notice_list.append({'name': name, 'group': group, 'type': type, 'root':root, 'keyid': keyid,
                                            'keysecret': keysecret, 'msg': res})
            if len(notice_list) > 0:
                notice(notice_list)
    except FileNotFoundError:
        print('ali.json文件未找到')






        # Sample.main(keyid='LTAI5tKGARFdt1gMnaSXCEuA',keysecret='cyWRCHl2mrh9Cv6C5D2H01E4WCBuPo')
        # notify_text = f"{url} 连续访问错误{error_counts[url]}次,请检查"
        # notify_url = "https://api.telegram.org/bot6277743361:AAE8Z5yBEe5nSmcDvkSlgVTGCuEO5OdiGRw/sendMessage"
        # notify_data = {"chat_id": -957727804, "text": notify_text, "parse_mode": "Markdown"}
        # httpx.post(notify_url, json=notify_data)

{
  "data": [
    {
      "name": "XXX安卓下载CDN",
      "country": "EN",
      "group": "group1",
      "type": "阿里云",
      "账号":"XXX@hotmail.com",
      "keyid": "XXX",
      "keysecret": "XXX"
    },
    {
      "name": "XXX页图片oss",
      "country": "CN",
      "group": "group1",
      "type": "阿里云",
      "账号":"XXX@outlook.com",
      "keyid": "XXX",
      "keysecret": "XXX"
    }
  ]
}
```

Last updated