Nginx 业务接口触发拉黑IP

'''
## pip3 install fastapi==0.108.0 pyTelegramBotAPI==4.14.1 telebot==0.0.5 aiofiles==23.2.1 asyncio==3.4.3 loguru==0.7.2
export FASTAPI_BLACKLIST_PATH=/etc/nginx/blacklist.conf
export FASTAPI_NGINX_CMD_PATH=/usr/sbin/nginx
export FASTAPI_TG_ID=-xxxxx
export FASTAPI_TG_TOKEN=xxx:xxxx
export FASTAPI_TG_ENABLED=true
'''



from fastapi import FastAPI,status, BackgroundTasks, Depends, HTTPException
from fastapi.responses import Response,JSONResponse
from typing import Union
from fastapi import Query
from datetime import datetime
from loguru import logger
import aiofiles,asyncio,uvicorn,time,telebot,ipaddress,os,sys



#  ----------------------------- global log  ---------------------------------------
LOGGING_CONFIG = dict({
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "default": {
            "()": "uvicorn.logging.DefaultFormatter",
            "fmt": "%(levelprefix)s %(message)s",
            "use_colors": None,
        },
        "access": {
            "()": "uvicorn.logging.AccessFormatter",
            "fmt": '%(asctime)s %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s',
        },
    },
    "handlers": {
        "default": {
            "formatter": "default",
            "class": "logging.handlers.TimedRotatingFileHandler",
            "filename": "./main.log"
        },
        "access": {
            "formatter": "access",
            "class": "logging.handlers.TimedRotatingFileHandler",
            "filename": "./main.log"
 
        },
    },
    "loggers": {
        "": {"handlers": ["default"], "level": "INFO"},
        "uvicorn.error": {"level": "INFO"},
        "uvicorn.access": {"handlers": ["access"], "level": "INFO", "propagate": False},
    },
})

# ----------------------------- global env  ---------------------------------------
fastapi_nginx_path = os.getenv("FASTAPI_NGINX_CMD_PATH", '/usr/sbin/nginx')
fastapi_blacklist_path = os.getenv("FASTAPI_BLACKLIST_PATH", 'deny_list.txt' )
fastapi_tg_id = os.getenv("FASTAPI_TG_ID", '' )
fastapi_tg_token = os.getenv("FASTAPI_TG_TOKEN", '' )
fastapi_tg_enabled = os.getenv("FASTAPI_TG_ENABLED", False)
suffix = "0;\n"
debug_env=False

def notify_now():
    now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    return now
# --------------------------- add current path to env for pyinstaller ---------------------------

if not debug_env:
    root_path = os.getcwd()
    #print(root_path)
    sys.path.append(root_path)


# ----------------------------- Telegram notify  ---------------------------------------
async def Telegram(TG_ID, TG_TOKEN, text):
    try:
        bot = telebot.TeleBot(TG_TOKEN) 
        bot.send_message(TG_ID, text, parse_mode="Markdown")
    except Exception as e:
        logger.error(str(e))

async def sendToTelegram(msg):
    if fastapi_tg_enabled:
            await Telegram(fastapi_tg_id, fastapi_tg_token, msg)
    else:
        logger.warning('You not enabled telegram, Please set env FASTAPI_TG_ENABLED for true')
# ----------------------------- initialization  ---------------------------------------
from contextlib import asynccontextmanager

@asynccontextmanager
async def initialization(app: FastAPI):
    # startup loading
    start_msg = f"{notify_now()} startup app fastapi dir: {os.getcwd()} ===>>> 新交易所"
    end_msg = f"{notify_now()} shutdown app fastapi dir: {os.getcwd()} ===>>> 新交易所"
    logger.info(start_msg)

    
    logger.success("====================== ENV ==========================================")
    logger.info(f"    export FASTAPI_BLACKLIST_PATH={fastapi_blacklist_path}")
    logger.info(f"    export FASTAPI_NGINX_CMD_PATH={fastapi_nginx_path}")
    logger.info(f"    export FASTAPI_TG_ID={fastapi_tg_id}")
    logger.info(f"    export FASTAPI_TG_TOKEN={fastapi_tg_token}")
    logger.info(f"    export FASTAPI_TG_ENABLED={fastapi_tg_enabled}")
    logger.success("=====================================================================")
    

    if not debug_env:
        if not os.path.exists(fastapi_nginx_path):
            logger.error(f"{fastapi_nginx_path} not found, Are you sure already install nginx?")
            sys.exit(1)
        if not os.path.exists(fastapi_blacklist_path):
            logger.error(f"{fastapi_blacklist_path} not found, Please create this path name file, default current path")
            sys.exit(1) 


    await sendToTelegram(start_msg)


        
    yield

    # shutdown loading
    logger.info(end_msg)
    await sendToTelegram(end_msg)

    

# ----------------------------- Log input format --------------------------------------------------

if debug_env:
    log_path = os.path.join(os.getcwd(), 'logs')
    if not os.path.exists(log_path):
        os.mkdir(log_path)
    log_path_error = os.path.join(log_path, f'{time.strftime("%Y-%m-%d")}_error.log')
    logger.add(log_path_error, rotation="00:00", retention="5 days", enqueue=True)


# ----------------------------- Return interface encapsulation --------------------------------------
def Reponse(*, code=200, msg="success", data: Union[list, dict, str] = None) -> Response:
    return JSONResponse(
        status_code=status.HTTP_200_OK,
        content={
            'code': code, 
            'msg': msg,
            'data': data,
            'time': notify_now()
        }
    )

# ------------------------------ validate params-------------------------------------------------------

def validate_ip(ip: str = Query(...)):
    try:
        validate_ip = ipaddress.ip_address(ip)
        return validate_ip
    except ValueError:
        raise HTTPException(status_code=400, detail=f"{ip} Invalid IP Address")


# ----------------------------- Reload nginx service -----------------------------------------------
    
async def reload_nginx():
    
    check_msg = f"{notify_now()} {fastapi_nginx_path} -t ===>>> "
    reload_msg =f"{notify_now()} {fastapi_nginx_path} -s reload ===>>> "
    success_msg = "success"
    error_msg = "error"


    check_result = os.system(f'{fastapi_nginx_path} -t')
    try:
        if check_result == 0:
            
            logger.info(f"{check_msg} {success_msg}")
            await sendToTelegram(f"{check_msg} {success_msg}")

            reload_result = os.system(f'{fastapi_nginx_path} -s reload')
            
            if reload_result == 0:
                logger.info(f"{reload_msg} {success_msg}")
                await sendToTelegram(f"{reload_msg} {success_msg}")

            else:
                logger.error(f"{reload_msg}  {error_msg}")
                await sendToTelegram(f"{reload_msg} {error_msg}\n Please check the current program log: {os.getcwd()}/main.log")

        else:
            logger.info(f"{check_msg} error")
            await sendToTelegram(f"{check_msg} {error_msg}\n Please check the current program log: {os.getcwd()}/main.log")

    except Exception as e:
        logger.error(str(e))
        await sendToTelegram(f"{notify_now()} Failed to trigger blacklist operation: {str(e)}, \n Please check the current program log: {os.getcwd()}/main.log")



    # finally:
# --------------------------------------------------------------------------------------------------
    





app = FastAPI(lifespan=initialization)



async def write_file(file_path: str, data: str):
    logger.info(f'cover {file_path} file')
    async with aiofiles.open(file_path, "w", encoding="utf-8") as file:
        await file.writelines(data)


# Delete deny ip
async def acl_allow(file_path: str, data: str):
    allow_msg = f'{notify_now()}  acl delete {data} from ===>>> {file_path}'
    logger.info(allow_msg)
    async with aiofiles.open(file_path,"r",encoding="utf-8") as fp:
        contents = await fp.readlines()    
    
    # Delete matching IP for all
    try:
        contents.remove(f"{data} {suffix}")
        try:
            while 1>0:
                contents.remove(f"{data} {suffix}")
        except ValueError as e:
            pass

        await write_file(file_path=file_path, data=''.join(contents))
        return True, allow_msg + " success"
    except ValueError as e:
        logger.warning(str(e))
        return False, allow_msg + f" warning ===>>> {str(e)}"


# Add deny ip
async def acl_deny(file_path: str, data: str) :
    deny_msg = f'{notify_now()} acl deny {data}  to ===>>> {file_path}'
    logger.info(deny_msg)

    try:
        async with aiofiles.open(file_path,"r",encoding="utf-8") as fp:
            contents = await fp.readlines()  
        if f"{data} {suffix}" in contents:
            logger.warning(deny_msg + f" warning ===>>> already in blocklist")
            return False, deny_msg + f" warning ===>>> already in blocklist"

        async with aiofiles.open(file_path, "a", encoding="utf-8") as file:
            await file.write(f"{data} {suffix}")
        return True, deny_msg + " success"
    except Exception as e:
        deny_msg = deny_msg + f" error ===>>> {str(e)}"
        return False, deny_msg



@app.get("/acl/deny")
async def deny(background_tasks: BackgroundTasks,  ip: str = Depends(validate_ip) ):             # 验证拖慢速度
    async with asyncio.TaskGroup() as tg:
        task = tg.create_task(acl_deny(fastapi_blacklist_path, ip))

    if fastapi_tg_enabled:
        background_tasks.add_task(sendToTelegram, task.result()[-1])   # 发送通知拖慢速度
    
    if task.result()[0]:
        background_tasks.add_task(reload_nginx)
        return Reponse()
    
    return Reponse(code=400, msg=task.result()[-1])


@app.get("/acl/allow")
async def allow(background_tasks: BackgroundTasks, ip: str = Query(None)):             # 验证拖慢速度
    stdout, stderr = await asyncio.create_task(acl_allow(fastapi_blacklist_path, ip))

    if fastapi_tg_enabled:
        background_tasks.add_task(sendToTelegram, stderr)    # 发送通知拖慢速度
    
    if stdout:
        background_tasks.add_task(reload_nginx)
        return Reponse()
    return Reponse(code=404, msg=f"Error: Not Found ===>>> {stderr}" )




if __name__ == "__main__":
    uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=debug_env, workers=1, log_config=LOGGING_CONFIG, log_level="info")

Last updated