Python使用代理检测域名状态
🏦 Python3 环境
pip install aiohttp requests playwright jinja2 fake_useragent yaml
config.yaml
proxies:
- http://user01:password@ip:port
- http://user02:password@ip:port
domains:
- https://baidu.com
- https://qq.com
# 检测 H5 使用
SLOW_THRESHOLD: 3000
# 检查状态码使用
ALLOW_STATUS_CODE:
- 200
# 检查状态码使用
telegram:
enable: true
bot_token: 000000:000000000000
chat_id: -000000
# ========= 文件信息 =========
# Author: 000
# Email: 000@example.com
# Date: 2025-04-18 16:00:00
# LastEditTime: 2025-04-18 16:00:00
# Description: 通过代理检查域名返回状态码是否异常
# =========================
# conda create -n test-oss python=3.9.9 -c conda-forge
# conda activate
# pip install aiohttp httpx datetime fake-useragent urllib3 pyyaml
import asyncio
from urllib.parse import urlparse
import aiohttp
import time
import logging
import os
import httpx
from datetime import datetime
from logging.handlers import RotatingFileHandler
import urllib3
import yaml
from typing import Dict, List
import traceback
# from fake_useragent import UserAgent
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 日志设置
LOG_DIR = "logs"
SCREENSHOT_DIR = os.path.join(LOG_DIR, "screenshots")
REPORT_DIR = os.path.join(LOG_DIR, "reports")
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
os.makedirs(REPORT_DIR, exist_ok=True)
log_filename = os.path.join(LOG_DIR, f"检测日志_{datetime.now().strftime('%Y-%m-%d')}.log")
log_handler = RotatingFileHandler(log_filename, maxBytes=10*1024*1024, backupCount=5, encoding="utf-8")
log_formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] %(message)s", datefmt='%Y-%m-%d %H:%M:%S')
log_handler.setFormatter(log_formatter)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)
# ============================================================
# 读取 YAML
# ============================================================
def load_proxies_from_yaml(file_path):
with open(file_path, 'r') as f:
data = yaml.safe_load(f)
return data.get('proxies', []), data.get('domains', []), data.get("ALLOW_STATUS_CODE", []), data.get('telegram', {})
# 用例
PROXY_LIST, DOMAIN_LIST, ALLOW_STATUS_CODE, telegram = load_proxies_from_yaml('config.yaml')
# print(PROXY_LIST, DOMAIN_LIST, SLOW_THRESHOLD)
# print(telegram)
# ============================================================
# 发送 Telegram 异步函数
# ============================================================
TELEGRAM_BOT_TOKEN = telegram['bot_token'] if telegram['enable'] else None
TELEGRAM_CHAT_ID = telegram['chat_id'] if telegram['enable'] else None
async def send_telegram_content_async(message):
"""
异步发送内容到 Telegram(支持发送HTML文件)
:param message: 消息文本
:param file_path: 可选文件路径
:param html_mode: 是否启用 HTML 文件发送模式
"""
try:
base_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}"
headers = {
"Content-Type": "application/json"
}
async with aiohttp.ClientSession() as session:
send_message_url = f"{base_url}/sendMessage"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
}
async with session.post(send_message_url, json=payload, headers=headers) as resp:
if resp.status == 200:
logger.info("✅ 已成功发送消息")
else:
logger.error(f"❌ Telegram发送失败,状态码: {resp.status}")
return resp.status == 200
except Exception as e:
logger.error(f"❌ Telegram发送异常: {str(e)}")
return False
# ============================================================
# 检查单个域名的状态
# ============================================================
async def check_domain_status(domain: str, proxy: str) -> Dict:
"""检查单个域名的状态"""
transport = httpx.AsyncHTTPTransport(proxy=proxy, verify=False)
# ua = UserAgent()
# headers = {
# "User-Agent": ua.random
# }
async with httpx.AsyncClient(transport=transport, timeout=10) as client:
try:
response = await client.get(domain)
# print("✅ 状态码:", response.status_code)
# print("✅ 内容:", response.text[:300])
logger.info(f"ℹ️使用代理地址: {proxy}, 测试访问域名: {domain}, 状态码: {response.status_code}, ")
try:
ipGetResponse = await client.get("https://ip.me")
logger.info(f"ℹ️使用代理地址: {proxy}, 获取代理公网IP: {ipGetResponse.text.strip()}")
# 获取代理 IP 的地理位置
ipZoneResponse = await client.get(f"https://ipinfo.io/{ipGetResponse.text.strip()}")
ip_info = ipZoneResponse.json()
logger.info(f'ℹ️使用代理地址: {proxy}, 定位IP区域返回: { ip_info }')
# 返回任务结果,统计
return {"domain": domain, "status_code": response.status_code, "proxy": proxy, "ip_info": ip_info}
except Exception as e:
logger.warning(f'错误信息: {str(e)}')
return {"domain": domain, "status_code": response.status_code, "proxy": proxy}
except httpx.ReadTimeout:
logger.error(f"[❌ 超时] 请求 {domain} 使用代理 {proxy} 超时")
return {"domain": domain, "status_code": "Error", "error": f"请求 {domain} 使用代理超时", "proxy": proxy}
except httpx.ProxyError as e:
logger.error(f"[❌ 代理错误] {proxy} -> {domain}: {str(e)}")
return {"domain": domain, "status_code": "Error", "error": f"{str(e)}", "proxy": proxy}
except Exception as e:
logger.error(f"[❌ 其他错误] {domain} -> {proxy}")
traceback.print_exc()
return {
"domain": domain,
"status_code": "Error",
"error": str(e) or repr(e) or "未知错误",
"proxy": proxy
}
# await send_telegram_content_async(message=f"=== 函数 get_ip_info 获取 IP 信息失败 ===\n代理地址: {proxy}\n原因: {str(e)}")
# ============================================================
# 批量检查多个域名的状态
# ============================================================
async def check_domains(domains: List[str], proxy_pool: List[str]):
"""批量检查多个域名的状态"""
tasks = []
# 循环使用所有代理
for proxy in proxy_pool:
logger.info(f"⏳当前使用代理地址 {proxy}, 开始请求状态检测")
# 使用某个区域代理,请求所有域名
for domain in domains:
task = check_domain_status(domain, proxy)
tasks.append(task)
# 异步执行所有任务
results = await asyncio.gather(*tasks)
# 输出结果
for result in results:
if result["status_code"] == "Error":
logger.error(f"❌ 检测 {result['domain']} ERROR,代理 {result['proxy']} 错误: {result['error']}")
await send_telegram_content_async(message=f"检测地址: {result['domain']} \n失败原因: {result['error'] } \n使用代理: {result['proxy']} \n") if telegram['enable'] else None
# 当异常过多过 1s 之后停滞一会, 防止超过 tg 最大消息数
time.sleep(2)
else:
# 判断是否允许的状态
if result['status_code'] in ALLOW_STATUS_CODE:
logger.info(f"✅ 检测 {result['domain']} SUCCESS, 状态码: {result['status_code']},代理 {result['proxy']}")
else:
if result['ip_info']:
message=f"检测地址: {result['domain']}\n状态码: {result['status_code']} \n失败原因: 不在已定义允许的状态码内 \n使用代理: {result['proxy']} \n代理IP: {result['ip_info']['ip']} \n国家:{result['ip_info']['country']} \n城市: {result['ip_info']['city']} \n区域: {result['ip_info']['region']}"
else:
message=f"检测地址: {result['domain']}\n状态码: {result['status_code']} \n失败原因: 不在已定义允许的状态码内 \n使用代理: {result['proxy']}"
logger.error(message)
await send_telegram_content_async(message) if telegram['enable'] else None
# 当异常过多过 1s 停滞一会, 防止超过 tg 最大消息数
time.sleep(2)
async def main():
await check_domains(DOMAIN_LIST, PROXY_LIST)
# ===========================================================================
if __name__ == "__main__":
asyncio.run(main())
Last updated