Copy '''
## pip3 install fastapi==0.108.0 pyTelegramBotAPI==4.14.1 telebot==0.0.5 aiofiles==23.2.1 asyncio==3.4.3 loguru==0.7.2
export FASTAPI_BLACKLIST_PATH=/etc/nginx/blacklist.conf
export FASTAPI_NGINX_CMD_PATH=/usr/sbin/nginx
export FASTAPI_TG_ID=-xxxxx
export FASTAPI_TG_TOKEN=xxx:xxxx
export FASTAPI_TG_ENABLED=true
'''
from fastapi import FastAPI , status , BackgroundTasks
,
Depends
,
HTTPException
from fastapi . responses import Response , JSONResponse
from typing import Union
from fastapi import Query
from datetime import datetime
from loguru import logger
import aiofiles , asyncio , uvicorn , time , telebot , ipaddress , os , sys
# ----------------------------- global log ---------------------------------------
LOGGING_CONFIG = dict ({
"version" : 1 ,
"disable_existing_loggers" : False ,
"formatters" : {
"default" : {
"()" : "uvicorn.logging.DefaultFormatter" ,
"fmt" : " %(levelprefix)s %(message)s " ,
"use_colors" : None ,
},
"access" : {
"()" : "uvicorn.logging.AccessFormatter" ,
"fmt" : ' %(asctime)s %(levelprefix)s %(client_addr)s - " %(request_line)s " %(status_code)s ' ,
},
},
"handlers" : {
"default" : {
"formatter" : "default" ,
"class" : "logging.handlers.TimedRotatingFileHandler" ,
"filename" : "./main.log"
},
"access" : {
"formatter" : "access" ,
"class" : "logging.handlers.TimedRotatingFileHandler" ,
"filename" : "./main.log"
},
},
"loggers" : {
"" : { "handlers" : [ "default" ], "level" : "INFO" },
"uvicorn.error" : { "level" : "INFO" },
"uvicorn.access" : { "handlers" : [ "access" ], "level" : "INFO" , "propagate" : False },
},
})
# ----------------------------- global env ---------------------------------------
fastapi_nginx_path = os . getenv ( "FASTAPI_NGINX_CMD_PATH" , '/usr/sbin/nginx' )
fastapi_blacklist_path = os . getenv ( "FASTAPI_BLACKLIST_PATH" , 'deny_list.txt' )
fastapi_tg_id = os . getenv ( "FASTAPI_TG_ID" , '' )
fastapi_tg_token = os . getenv ( "FASTAPI_TG_TOKEN" , '' )
fastapi_tg_enabled = os . getenv ( "FASTAPI_TG_ENABLED" , False )
suffix = "0;\n"
debug_env = False
def notify_now ():
now = datetime . now (). strftime ( '%Y-%m- %d %H:%M:%S' )
return now
# --------------------------- add current path to env for pyinstaller ---------------------------
if not debug_env :
root_path = os . getcwd ()
#print(root_path)
sys . path . append (root_path)
# ----------------------------- Telegram notify ---------------------------------------
async def Telegram ( TG_ID , TG_TOKEN , text ):
try :
bot = telebot . TeleBot (TG_TOKEN)
bot . send_message (TG_ID, text, parse_mode = "Markdown" )
except Exception as e :
logger . error ( str (e))
async def sendToTelegram ( msg ):
if fastapi_tg_enabled :
await Telegram (fastapi_tg_id, fastapi_tg_token, msg)
else :
logger . warning ( 'You not enabled telegram, Please set env FASTAPI_TG_ENABLED for true' )
# ----------------------------- initialization ---------------------------------------
from contextlib import asynccontextmanager
@asynccontextmanager
async def initialization ( app : FastAPI):
# startup loading
start_msg = f " { notify_now () } startup app fastapi dir: { os . getcwd () } ===>>> 新交易所"
end_msg = f " { notify_now () } shutdown app fastapi dir: { os . getcwd () } ===>>> 新交易所"
logger . info (start_msg)
logger . success ( "====================== ENV ==========================================" )
logger . info ( f " export FASTAPI_BLACKLIST_PATH= { fastapi_blacklist_path } " )
logger . info ( f " export FASTAPI_NGINX_CMD_PATH= { fastapi_nginx_path } " )
logger . info ( f " export FASTAPI_TG_ID= { fastapi_tg_id } " )
logger . info ( f " export FASTAPI_TG_TOKEN= { fastapi_tg_token } " )
logger . info ( f " export FASTAPI_TG_ENABLED= { fastapi_tg_enabled } " )
logger . success ( "=====================================================================" )
if not debug_env :
if not os . path . exists (fastapi_nginx_path):
logger . error ( f " { fastapi_nginx_path } not found, Are you sure already install nginx?" )
sys . exit ( 1 )
if not os . path . exists (fastapi_blacklist_path):
logger . error ( f " { fastapi_blacklist_path } not found, Please create this path name file, default current path" )
sys . exit ( 1 )
await sendToTelegram (start_msg)
yield
# shutdown loading
logger . info (end_msg)
await sendToTelegram (end_msg)
# ----------------------------- Log input format --------------------------------------------------
if debug_env :
log_path = os . path . join (os. getcwd (), 'logs' )
if not os . path . exists (log_path):
os . mkdir (log_path)
log_path_error = os . path . join (log_path, f ' { time. strftime ( "%Y-%m- %d " ) } _error.log' )
logger . add (log_path_error, rotation = "00:00" , retention = "5 days" , enqueue = True )
# ----------------------------- Return interface encapsulation --------------------------------------
def Reponse ( * , code = 200 , msg = "success" , data : Union [ list , dict , str ] = None ) -> Response:
return JSONResponse (
status_code = status.HTTP_200_OK,
content = {
'code' : code,
'msg' : msg,
'data' : data,
'time' : notify_now ()
}
)
# ------------------------------ validate params-------------------------------------------------------
def validate_ip ( ip : str = Query (...) ):
try :
validate_ip = ipaddress . ip_address (ip)
return validate_ip
except ValueError :
raise HTTPException (status_code = 400 , detail = f " { ip } Invalid IP Address" )
# ----------------------------- Reload nginx service -----------------------------------------------
async def reload_nginx ():
check_msg = f " { notify_now () } { fastapi_nginx_path } -t ===>>> "
reload_msg = f " { notify_now () } { fastapi_nginx_path } -s reload ===>>> "
success_msg = "success"
error_msg = "error"
check_result = os . system ( f ' { fastapi_nginx_path } -t' )
try :
if check_result == 0 :
logger . info ( f " { check_msg } { success_msg } " )
await sendToTelegram ( f " { check_msg } { success_msg } " )
reload_result = os . system ( f ' { fastapi_nginx_path } -s reload' )
if reload_result == 0 :
logger . info ( f " { reload_msg } { success_msg } " )
await sendToTelegram ( f " { reload_msg } { success_msg } " )
else :
logger . error ( f " { reload_msg } { error_msg } " )
await sendToTelegram ( f " { reload_msg } { error_msg } \n Please check the current program log: { os. getcwd () } /main.log" )
else :
logger . info ( f " { check_msg } error" )
await sendToTelegram ( f " { check_msg } { error_msg } \n Please check the current program log: { os. getcwd () } /main.log" )
except Exception as e :
logger . error ( str (e))
await sendToTelegram ( f " { notify_now () } Failed to trigger blacklist operation: {str (e) } , \n Please check the current program log: { os. getcwd () } /main.log" )
# finally:
# --------------------------------------------------------------------------------------------------
app = FastAPI (lifespan = initialization)
async def write_file ( file_path : str , data : str ):
logger . info ( f 'cover { file_path } file' )
async with aiofiles . open (file_path, "w" , encoding = "utf-8" ) as file :
await file . writelines (data)
# Delete deny ip
async def acl_allow ( file_path : str , data : str ):
allow_msg = f ' { notify_now () } acl delete { data } from ===>>> { file_path } '
logger . info (allow_msg)
async with aiofiles . open (file_path, "r" ,encoding = "utf-8" ) as fp :
contents = await fp . readlines ()
# Delete matching IP for all
try :
contents . remove ( f " { data } { suffix } " )
try :
while 1 > 0 :
contents . remove ( f " { data } { suffix } " )
except ValueError as e :
pass
await write_file (file_path = file_path, data = '' . join (contents))
return True , allow_msg + " success"
except ValueError as e :
logger . warning ( str (e))
return False , allow_msg + f " warning ===>>> {str (e) } "
# Add deny ip
async def acl_deny ( file_path : str , data : str ) :
deny_msg = f ' { notify_now () } acl deny { data } to ===>>> { file_path } '
logger . info (deny_msg)
try :
async with aiofiles . open (file_path, "r" ,encoding = "utf-8" ) as fp :
contents = await fp . readlines ()
if f " { data } { suffix } " in contents :
logger . warning (deny_msg + f " warning ===>>> already in blocklist" )
return False , deny_msg + f " warning ===>>> already in blocklist"
async with aiofiles . open (file_path, "a" , encoding = "utf-8" ) as file :
await file . write ( f " { data } { suffix } " )
return True , deny_msg + " success"
except Exception as e :
deny_msg = deny_msg + f " error ===>>> {str (e) } "
return False , deny_msg
@app . get ( "/acl/deny" )
async def deny ( background_tasks : BackgroundTasks , ip : str = Depends (validate_ip) ): # 验证拖慢速度
async with asyncio . TaskGroup () as tg :
task = tg . create_task ( acl_deny (fastapi_blacklist_path, ip))
if fastapi_tg_enabled :
background_tasks . add_task (sendToTelegram, task. result ()[ - 1 ]) # 发送通知拖慢速度
if task . result () [ 0 ] :
background_tasks . add_task (reload_nginx)
return Reponse ()
return Reponse (code = 400 , msg = task. result ()[ - 1 ])
@app . get ( "/acl/allow" )
async def allow ( background_tasks : BackgroundTasks , ip : str = Query ( None ) ): # 验证拖慢速度
stdout , stderr = await asyncio . create_task ( acl_allow (fastapi_blacklist_path, ip))
if fastapi_tg_enabled :
background_tasks . add_task (sendToTelegram, stderr) # 发送通知拖慢速度
if stdout :
background_tasks . add_task (reload_nginx)
return Reponse ()
return Reponse (code = 404 , msg = f "Error: Not Found ===>>> { stderr } " )
if __name__ == "__main__" :
uvicorn . run ( 'main:app' , host = '0.0.0.0' , port = 8000 , reload = debug_env, workers = 1 , log_config = LOGGING_CONFIG, log_level = "info" )