监控输出到ES错误日志告警

复杂方式

服务端


  • main.py 主程序

# DOCKER测试环境: docker run -it --rm --name test_check_es  python:3.9 bash
# 环境依赖PY3.7+: pip install fastapi uvicorn loguru aiogram
# DOCKER镜像打包: docker build --no-cache -t check_es .
# DOCKER容器启动: docker run -d --net=host -e "TZ=Asia/Shanghai"  -e 'BOT_TOKEN=TELEGRAM_TOKEN' --name check_es check_es
# 请求测试, JSON数据格式中,message 字段必须存在: curl -XPOST  -d '{"key1": "value1","key2": "value2","message": "错误信息",}' -H "Content-Type: application/json" http://192.168.0.134:8000/alter/<chat_id> 

from fastapi import FastAPI, Request,BackgroundTasks, HTTPException
from pydantic import BaseModel, Field
from loguru import logger
from aiogram import Bot, Dispatcher
import sys,os

# 创建 FastAPI 实例
app = FastAPI()



# 从环境变量中获取字典数据
BOT_TOKEN = os.getenv('BOT_TOKEN'None)
if BOT_TOKEN is None:
    raise ValueError



# 自定义日志格式
log_format = "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"

# 配置日志记录器
logger.remove()  # 移除默认处理器
logger.add(sys.stdout, format=log_format, level="INFO")  # 添加新的处理器并指定格式和日志级别


# 创建一个 Pydantic 模型来动态接收 JSON 数据
# class DynamicItem(BaseModel):
#     user: str = Field(default=None, description="Optional user name to override default", examples='TG user name')


# 创建 Telegram Bot 实例, 必须全局
bot = Bot(token=BOT_TOKEN)
dp = Dispatcher()
MAX_MESSAGE_LENGTH = 4096 # Telegram API 限制


# 定义 POST 接口,接收 JSON 数据,并处理后台任务发送 Telegram 消息
@app.post("/alter/{chat_id}")
async def dynamic_alter(request: Request, chat_id: int, background_tasks: BackgroundTasks):
      item = await request.json()

      continue_str = 'message'
      is_message = item.get(continue_str, None)

      if is_message is None:
          raise HTTPException(status_code=400, detail='Parameter error; Tip: message is must be')

      notification_message = "Elasticsearch 日志告警 🐟🐟🐟:\n\n"
      for key, value in item.items():
          if key == continue_str: # 过滤不展示用户,临时剔除 message 信息,计算TG信息长度
            continue
          notification_message += f"{key.upper()}{value}\n"

      # 计算 TG text 信息剩余长度, 优化 10 字符
      count_notification_message = len(notification_message)
      loss_count_notification_message = MAX_MESSAGE_LENGTH - count_notification_message - len(continue_str) - 10
      notification_message += f"MESSAGE: {item['message'][:loss_count_notification_message]}\n"

      # 异步发送 Telegram 消息
      background_tasks.add_task(send_telegram_message, chat_id, notification_message)

      logger.info(f"Received JSON data: {item}")
      return {"message": "Dynamic JSON data received successfully and notification queued for sending"}



# 后台任务函数,用来发送 Telegram 消息
async def send_telegram_message(chat_id: str, message: str):
    try:
        # 发送消息
        await bot.send_message(chat_id=chat_id, text=message)
        logger.info(f"Message sent to chat ID: {chat_id}")
    except Exception as e:
        logger.error(f"Failed to send message to chat ID {chat_id}{str(e)}")

    finally:
      await bot.close()


# 运行 FastAPI 应用
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

# uvicorn main:app --reload  
# OR 
# python3 main.py

  • Dockerfile

FROM python:3.9-slim
RUN pip install --upgrade pip
RUN pip install fastapi uvicorn loguru aiogram --no-cache-dir
COPY main.py main.py

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

客户端

Logstash 数据处理引擎 Logstash-http-output 插件, Docker 启动 Logstash 7.8 方式默认已安装

  • logstash.conf

output {
  # ...其他输出存储略

  # stdout { codec => "rubydebug" }

  # 判断自定义的 fied 类型不是 ["nginx","nginx-proxy"],并且 信息中含有 error 错误字符 
  if "error" in [message] and [server_type] not in ["nginx","nginx-proxy"] {
    http {
      url ="http://<服务端地址>:8000/alter/<You_Telegram_Chat_Id>"
      http_method ="post"
      format ="json"
      content_type ="application/json"
      headers => {
        "Content-Type" => "application/json"
      }
   }

}

简单方式

## Logstash 直接发送到 TG 群组
output {
  # ...其他输出存储略

  # stdout { codec => "rubydebug" }


  if "error" in [message] {
    http {
      url ="https://api.telegram.org/bot<your_bot_token>/sendMessage"
      http_method ="post"
      format ="json"
      content_type ="application/json"
      headers => {
        "Content-Type" => "application/json"
      }
      mapping => {
        "chat_id" => "<your_chat_id>"
        "text" => "Error log detected: %{[message]}"    # 如果 message 字段错误信息过长,无法发送
      }
    }
  }

}

告警消息示例图

Last updated