监控输出到ES错误日志告警
复杂方式
服务端
main.py 主程序
# DOCKER测试环境: docker run -it --rm --name test_check_es python:3.9 bash
# 环境依赖PY3.7+: pip install fastapi uvicorn loguru aiogram
# DOCKER镜像打包: docker build --no-cache -t check_es .
# DOCKER容器启动: docker run -d --net=host -e "TZ=Asia/Shanghai" -e 'BOT_TOKEN=TELEGRAM_TOKEN' --name check_es check_es
# 请求测试, JSON数据格式中,message 字段必须存在: curl -XPOST -d '{"key1": "value1","key2": "value2","message": "错误信息",}' -H "Content-Type: application/json" http://192.168.0.134:8000/alter/<chat_id>
from fastapi import FastAPI, Request,BackgroundTasks, HTTPException
from pydantic import BaseModel, Field
from loguru import logger
from aiogram import Bot, Dispatcher
import sys,os
# 创建 FastAPI 实例
app = FastAPI()
# 从环境变量中获取字典数据
BOT_TOKEN = os.getenv('BOT_TOKEN', None)
if BOT_TOKEN is None:
raise ValueError
# 自定义日志格式
log_format = "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
# 配置日志记录器
logger.remove() # 移除默认处理器
logger.add(sys.stdout, format=log_format, level="INFO") # 添加新的处理器并指定格式和日志级别
# 创建一个 Pydantic 模型来动态接收 JSON 数据
# class DynamicItem(BaseModel):
# user: str = Field(default=None, description="Optional user name to override default", examples='TG user name')
# 创建 Telegram Bot 实例, 必须全局
bot = Bot(token=BOT_TOKEN)
dp = Dispatcher()
MAX_MESSAGE_LENGTH = 4096 # Telegram API 限制
# 定义 POST 接口,接收 JSON 数据,并处理后台任务发送 Telegram 消息
@app.post("/alter/{chat_id}")
async def dynamic_alter(request: Request, chat_id: int, background_tasks: BackgroundTasks):
item = await request.json()
continue_str = 'message'
is_message = item.get(continue_str, None)
if is_message is None:
raise HTTPException(status_code=400, detail='Parameter error; Tip: message is must be')
notification_message = "Elasticsearch 日志告警 🐟🐟🐟:\n\n"
for key, value in item.items():
if key == continue_str: # 过滤不展示用户,临时剔除 message 信息,计算TG信息长度
continue
notification_message += f"{key.upper()}: {value}\n"
# 计算 TG text 信息剩余长度, 优化 10 字符
count_notification_message = len(notification_message)
loss_count_notification_message = MAX_MESSAGE_LENGTH - count_notification_message - len(continue_str) - 10
notification_message += f"MESSAGE: {item['message'][:loss_count_notification_message]}\n"
# 异步发送 Telegram 消息
background_tasks.add_task(send_telegram_message, chat_id, notification_message)
logger.info(f"Received JSON data: {item}")
return {"message": "Dynamic JSON data received successfully and notification queued for sending"}
# 后台任务函数,用来发送 Telegram 消息
async def send_telegram_message(chat_id: str, message: str):
try:
# 发送消息
await bot.send_message(chat_id=chat_id, text=message)
logger.info(f"Message sent to chat ID: {chat_id}")
except Exception as e:
logger.error(f"Failed to send message to chat ID {chat_id}: {str(e)}")
finally:
await bot.close()
# 运行 FastAPI 应用
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
# uvicorn main:app --reload
# OR
# python3 main.py
Dockerfile
FROM python:3.9-slim
RUN pip install --upgrade pip
RUN pip install fastapi uvicorn loguru aiogram --no-cache-dir
COPY main.py main.py
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
客户端
Logstash 数据处理引擎 Logstash-http-output 插件, Docker 启动 Logstash 7.8 方式默认已安装
logstash.conf
output {
# ...其他输出存储略
# stdout { codec => "rubydebug" }
# 判断自定义的 fied 类型不是 ["nginx","nginx-proxy"],并且 信息中含有 error 错误字符
if "error" in [message] and [server_type] not in ["nginx","nginx-proxy"] {
http {
url => "http://<服务端地址>:8000/alter/<You_Telegram_Chat_Id>"
http_method => "post"
format => "json"
content_type => "application/json"
headers => {
"Content-Type" => "application/json"
}
}
}
简单方式
## Logstash 直接发送到 TG 群组
output {
# ...其他输出存储略
# stdout { codec => "rubydebug" }
if "error" in [message] {
http {
url => "https://api.telegram.org/bot<your_bot_token>/sendMessage"
http_method => "post"
format => "json"
content_type => "application/json"
headers => {
"Content-Type" => "application/json"
}
mapping => {
"chat_id" => "<your_chat_id>"
"text" => "Error log detected: %{[message]}" # 如果 message 字段错误信息过长,无法发送
}
}
}
}
告警消息示例图
Last updated