version>=python3.6
参考: https://www.alibabacloud.com/help/zh/bss-openapi/latest/api-bssopenapi-2017-12-14-overview?spm=a2c63.p38356.0.0.39c6745ffhTCg6
Copy pip install alibabacloud - bssopenapi20171214
Copy
from alibabacloud_bssopenapi20171214 . client import Client as BssOpenApi20171214Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
class Sample :
Copy {
"data": [
{
"name": "XXX安卓下载CDN",
"country": "EN",
"group": "group1",
"type": "阿里云",
"账号":"XXX@hotmail.com",
"keyid": "XXX",
"keysecret": "XXX"
},
{
"name": "XXX页图片oss",
"country": "CN",
"group": "group1",
"type": "阿里云",
"账号":"XXX@outlook.com",
"keyid": "XXX",
"keysecret": "XXX"
}
]
}
```
def __init__ ( self ):
pass
@ staticmethod
def create_client (
access_key_id : str ,
access_key_secret : str ,
country : str ,
) -> BssOpenApi20171214Client:
config = open_api_models . Config (
# Required, your AccessKey ID,
access_key_id = access_key_id,
# Required, your AccessKey secret,
access_key_secret = access_key_secret
)
# See https://api.alibabacloud.com/product/BssOpenApi.
if country == 'EN' :
config . endpoint = f 'business.ap-southeast-1.aliyuncs.com'
return BssOpenApi20171214Client (config)
elif country == 'CN' :
config . endpoint = f 'business.aliyuncs.com'
return BssOpenApi20171214Client (config)
@ staticmethod
def main (
keyid : str ,
keysecret : str ,
country : str ,
) -> None :
client = Sample . create_client (keyid,keysecret,country)
runtime = util_models . RuntimeOptions ()
try :
response = client . query_account_balance_with_options (runtime)
print (response)
cash = response . body . data . available_amount
if cash :
cash = cash . replace ( ',' , '' )
Currency = response . body . data . currency
return ( f '当前余额为: { cash } { Currency } ' )
except Exception as error :
print (error)
def notice ( data ):
import httpx
notify_text = ''
for i in data :
notify_text = f ''' { notify_text }{ i [ 'root' ] } \n 服务商: { i [ 'type' ] } \n 用途: { i [ 'name' ] } \n 商家: { i [ 'group' ] } \n { i [ 'msg' ] } \n\n '''
print (notify_text)
notify_url = "https://api.telegram.org/bot6277743361:xxxxxxxx/sendMessage"
notify_data = { "chat_id" : - 123 , "text" : notify_text , "parse_mode" : "Markdown" }
#推送
#httpx.post(notify_url, json=notify_data)
if __name__ == '__main__' :
import json
#read json file
try :
with open ( 'ali.json' , 'r' , encoding = 'UTF-8' ) as f :
data = json . load (f) [ 'data' ]
notice_list = []
for i in data :
name = i [ 'name' ]
country = i [ 'country' ]
group = i [ 'group' ]
type = i [ 'type' ]
root = i [ '账号' ]
keyid = i [ 'keyid' ]
keysecret = i [ 'keysecret' ]
if type == '阿里云' :
res = Sample . main (keyid = keyid, keysecret = keysecret, country = country)
if res :
notice_list . append ({ 'name' : name, 'group' : group, 'type' : type , 'root' :root, 'keyid' : keyid,
'keysecret' : keysecret, 'msg' : res})
if len (notice_list) > 0 :
notice (notice_list)
except FileNotFoundError :
print ( 'ali.json文件未找到' )
# Sample.main(keyid='xxxxxxx',keysecret='xxxxxxxxx')
# notify_text = f"{url} 连续访问错误{error_counts[url]}次,请检查"
# notify_url = "https://api.telegram.org/bot6277743361:xxxx/sendMessage"
# notify_data = {"chat_id": -00000000, "text": notify_text, "parse_mode": "Markdown"}
# httpx.post(notify_url, json=notify_data)