# 监控输出到ES错误日志告警

## 复杂方式 <a href="#h" id="h"></a>

### 服务端 <a href="#h-1" id="h-1"></a>

***

* main.py 主程序

```python
# DOCKER测试环境: docker run -it --rm --name test_check_es  python:3.9 bash
# 环境依赖PY3.7+： pip install fastapi uvicorn loguru aiogram
# DOCKER镜像打包： docker build --no-cache -t check_es .
# DOCKER容器启动： docker run -d --net=host -e "TZ=Asia/Shanghai"  -e 'BOT_TOKEN=TELEGRAM_TOKEN' --name check_es check_es
# 请求测试， JSON数据格式中，message 字段必须存在： curl -XPOST  -d '{"key1": "value1","key2": "value2","message": "错误信息",}' -H "Content-Type: application/json" http://192.168.0.134:8000/alter/<chat_id> 

from fastapi import FastAPI, Request,BackgroundTasks, HTTPException
from pydantic import BaseModel, Field
from loguru import logger
from aiogram import Bot, Dispatcher
import sys,os

# 创建 FastAPI 实例
app = FastAPI()



# 从环境变量中获取字典数据
BOT_TOKEN = os.getenv('BOT_TOKEN', None)
if BOT_TOKEN is None:
    raise ValueError



# 自定义日志格式
log_format = "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"

# 配置日志记录器
logger.remove()  # 移除默认处理器
logger.add(sys.stdout, format=log_format, level="INFO")  # 添加新的处理器并指定格式和日志级别


# 创建一个 Pydantic 模型来动态接收 JSON 数据
# class DynamicItem(BaseModel):
#     user: str = Field(default=None, description="Optional user name to override default", examples='TG user name')


# 创建 Telegram Bot 实例, 必须全局
bot = Bot(token=BOT_TOKEN)
dp = Dispatcher()
MAX_MESSAGE_LENGTH = 4096 # Telegram API 限制


# 定义 POST 接口，接收 JSON 数据，并处理后台任务发送 Telegram 消息
@app.post("/alter/{chat_id}")
async def dynamic_alter(request: Request, chat_id: int, background_tasks: BackgroundTasks):
      item = await request.json()

      continue_str = 'message'
      is_message = item.get(continue_str, None)

      if is_message is None:
          raise HTTPException(status_code=400, detail='Parameter error; Tip: message is must be')

      notification_message = "Elasticsearch 日志告警 🐟🐟🐟:\n\n"
      for key, value in item.items():
          if key == continue_str: # 过滤不展示用户，临时剔除 message 信息,计算TG信息长度
            continue
          notification_message += f"{key.upper()}: {value}\n"

      # 计算 TG text 信息剩余长度, 优化 10 字符
      count_notification_message = len(notification_message)
      loss_count_notification_message = MAX_MESSAGE_LENGTH - count_notification_message - len(continue_str) - 10
      notification_message += f"MESSAGE: {item['message'][:loss_count_notification_message]}\n"

      # 异步发送 Telegram 消息
      background_tasks.add_task(send_telegram_message, chat_id, notification_message)

      logger.info(f"Received JSON data: {item}")
      return {"message": "Dynamic JSON data received successfully and notification queued for sending"}



# 后台任务函数，用来发送 Telegram 消息
async def send_telegram_message(chat_id: str, message: str):
    try:
        # 发送消息
        await bot.send_message(chat_id=chat_id, text=message)
        logger.info(f"Message sent to chat ID: {chat_id}")
    except Exception as e:
        logger.error(f"Failed to send message to chat ID {chat_id}: {str(e)}")

    finally:
      await bot.close()


# 运行 FastAPI 应用
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

# uvicorn main:app --reload  
# OR 
# python3 main.py
```

***

* Dockerfile

```file
FROM python:3.9-slim
RUN pip install --upgrade pip
RUN pip install fastapi uvicorn loguru aiogram --no-cache-dir
COPY main.py main.py

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
```

### 客户端 <a href="#h-2" id="h-2"></a>

> Logstash 数据处理引擎\
> Logstash-http-output 插件, Docker 启动 Logstash 7.8 方式默认已安装

* logstash.conf

```ini
output {
  # ...其他输出存储略

  # stdout { codec => "rubydebug" }

  # 判断自定义的 fied 类型不是 ["nginx","nginx-proxy"]，并且 信息中含有 error 错误字符 
  if "error" in [message] and [server_type] not in ["nginx","nginx-proxy"] {
    http {
      url => "http://<服务端地址>:8000/alter/<You_Telegram_Chat_Id>"
      http_method => "post"
      format => "json"
      content_type => "application/json"
      headers => {
        "Content-Type" => "application/json"
      }
   }

}
```

## 简单方式 <a href="#h-3" id="h-3"></a>

```ini
## Logstash 直接发送到 TG 群组
output {
  # ...其他输出存储略

  # stdout { codec => "rubydebug" }


  if "error" in [message] {
    http {
      url => "https://api.telegram.org/bot<your_bot_token>/sendMessage"
      http_method => "post"
      format => "json"
      content_type => "application/json"
      headers => {
        "Content-Type" => "application/json"
      }
      mapping => {
        "chat_id" => "<your_chat_id>"
        "text" => "Error log detected: %{[message]}"    # 如果 message 字段错误信息过长，无法发送
      }
    }
  }

}
```

## 告警消息示例图

<figure><img src="https://2134947750-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FvILwbD2PrkBCkSitM3m4%2Fuploads%2FToUi9f99dBcSKyDXIETC%2Fimage.png?alt=media&#x26;token=fbf90fd2-8059-44a9-97b8-142c921e317c" alt=""><figcaption></figcaption></figure>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://close.gitbook.io/yun-wei-bi-ji/es/jian-kong-shu-chu-dao-es-cuo-wu-ri-zhi-gao-jing.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
