Python阿里云余额TG报警
version>=python3.6
pip install alibabacloud-bssopenapi20171214
en: 国际
cn: 国内
from alibabacloud_bssopenapi20171214.client import Client as BssOpenApi20171214Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
class Sample:
def __init__(self):
pass
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
country: str,
) -> BssOpenApi20171214Client:
config = open_api_models.Config(
# Required, your AccessKey ID,
access_key_id=access_key_id,
# Required, your AccessKey secret,
access_key_secret=access_key_secret
)
# See https://api.alibabacloud.com/product/BssOpenApi.
if country == 'EN':
config.endpoint = f'business.ap-southeast-1.aliyuncs.com'
return BssOpenApi20171214Client(config)
elif country == 'CN':
config.endpoint = f'business.aliyuncs.com'
return BssOpenApi20171214Client(config)
@staticmethod
def main(
keyid:str,
keysecret:str,
country:str,
) -> None:
client = Sample.create_client(keyid,keysecret,country)
runtime = util_models.RuntimeOptions()
try:
response = client.query_account_balance_with_options(runtime)
print(response)
cash = response.body.data.available_amount
if cash:
cash = cash.replace(',','')
Currency = response.body.data.currency
return(f'当前余额为:{cash} {Currency}')
except Exception as error:
print(error)
def notice(data):
import httpx
notify_text=''
for i in data:
notify_text = f'''{notify_text}{i['root']}\n服务商:{i['type']}\n用途:{i['name']}\n商家:{i['group']}\n{i['msg']}\n\n'''
print(notify_text)
notify_url = "https://api.telegram.org/bot6277743361:xxxxxxxx/sendMessage"
notify_data = {"chat_id": -123, "text": notify_text, "parse_mode": "Markdown"}
#推送
#httpx.post(notify_url, json=notify_data)
if __name__ == '__main__':
import json
#read json file
try:
with open('ali.json', 'r', encoding='UTF-8') as f:
data = json.load(f)['data']
notice_list = []
for i in data:
name = i['name']
country = i['country']
group = i['group']
type = i['type']
root = i['账号']
keyid = i['keyid']
keysecret = i['keysecret']
if type == '阿里云':
res = Sample.main(keyid=keyid, keysecret=keysecret, country=country)
if res:
notice_list.append({'name': name, 'group': group, 'type': type, 'root':root, 'keyid': keyid,
'keysecret': keysecret, 'msg': res})
if len(notice_list) > 0:
notice(notice_list)
except FileNotFoundError:
print('ali.json文件未找到')
# Sample.main(keyid='xxxxxxx',keysecret='xxxxxxxxx')
# notify_text = f"{url} 连续访问错误{error_counts[url]}次,请检查"
# notify_url = "https://api.telegram.org/bot6277743361:xxxx/sendMessage"
# notify_data = {"chat_id": -00000000, "text": notify_text, "parse_mode": "Markdown"}
# httpx.post(notify_url, json=notify_data)
{
"data": [
{
"name": "XXX安卓下载CDN",
"country": "EN",
"group": "group1",
"type": "阿里云",
"账号":"XXX@hotmail.com",
"keyid": "XXX",
"keysecret": "XXX"
},
{
"name": "XXX页图片oss",
"country": "CN",
"group": "group1",
"type": "阿里云",
"账号":"XXX@outlook.com",
"keyid": "XXX",
"keysecret": "XXX"
}
]
}
```
Last updated