# coding: utf-8
# import execjs
import threading
import queue
from _env import _global, _proxies, _open, _from_file_name, _to_file_name, _error_file_name, _read_time, _startDate, _endDate, _threads
import requests
import time
import pandas as pd
import random
from fake_useragent import UserAgent
import re
num_of_threads = _threads # 假如有 5 个线程
q = queue.Queue() # 创建一个FIFO队列对象,不设置上限
threads = [] # 创建一个线程池
# 搜索指数数据解密
def decryption(keys, data):
dec_dict = {}
for j in range(len(keys) // 2):
dec_dict[keys[j]] = keys[len(keys) // 2 + j]
dec_data = ''
for k in range(len(data)):
dec_data += dec_dict[data[k]]
return dec_data
# 获取数据
def response(word, Cookie, Cipher_Text):
scenicName = word
dataUrl = 'https://index.baidu.com/api/SearchApi/index?area=0&word=[[%7B%22name%22:%22' + \
scenicName + '%22,%22wordType%22:1%7D]]&startDate=' + _startDate + '&endDate=' + _endDate
# keyUrl = 'https://index.baidu.com/Interface/ptbk?uniqid='
header = {
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cookie': Cookie,
'Host': 'index.baidu.com',
'Referer': 'https://index.baidu.com/v2/main/index.html',
'sec-ch-ua': '" Not;A Brand";v="99", "Google Chrome";v="91", "Chromium";v="91"',
'sec-ch-ua-mobile': '?0',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36',
'User-Agent': UserAgent().random,
'Cipher-Text': Cipher_Text
}
# 判断是否开启了代理模式
if _open:
# 设置请求超时时间为30秒
resData = requests.get(dataUrl, timeout=30,headers=header, proxies=get_proxy())
else:
resData = requests.get(dataUrl, timeout=30,headers=header)
_res = resData.json()
_search = re.search('uc_login_unique=.*?;', Cookie)
# print(Cookie)
# print("调用Cookies: {0}".format(_search.group()))
if _res['status'] == 10018:
print("="*60)
print("\n")
print("Cookies: {0}\n警告: {1}\n\n如何处理?: 先暂停此账户,过一会再重新使用!!!".format(_search.group(), _res['message']))
print("\n")
print("="*60)
if _res['status'] == 10000:
print("="*60)
print("\n")
print("Cookies: {0}\n警告: {1}!".format(_search.group(), _res['message']))
print("\n")
print("="*60)
return _res
# print(resData.json())
def get_data(_res_data):
_start_time = _res_data['userIndexes'][0]['all']['startDate'] # 开始时间
_end_time = _res_data['userIndexes'][0]['all']['endDate'] # 结束时间
_search_word = _res_data['generalRatio'][0]['word'][0]['name'] # 关键词
_all_avg = _res_data['generalRatio'][0]['all']['avg'] # 整体日均值
_all_yoy = _res_data['generalRatio'][0]['all']['yoy'] # 整体同比
_all_qoq = _res_data['generalRatio'][0]['all']['qoq'] # 整体环比
_pc_avg = _res_data['generalRatio'][0]['pc']['avg'] # PC日均值
_pc_yoy = _res_data['generalRatio'][0]['pc']['yoy'] # PC同比
_pc_qoq = _res_data['generalRatio'][0]['pc']['qoq'] # PC环比
_wise_avg = _res_data['generalRatio'][0]['wise']['avg'] # 移动日均值
_wise_yoy = _res_data['generalRatio'][0]['wise']['yoy'] # 移动同比
_wise_qoq = _res_data['generalRatio'][0]['wise']['qoq'] # 移动环比
return [_search_word, _all_avg, _all_yoy,
_all_qoq, _pc_avg, _pc_yoy, _pc_qoq, _wise_avg, _wise_yoy, _wise_qoq, _start_time, _end_time]
# 创建 EXCEL
def create_form(excel_file_name):
form_header = ['关键词', '整体日均值', '整体同比',
'整体环比', 'PC日均值', 'PC同比', 'PC环比', '移动日均值', '移动同比', '移动环比', '开始时间','结束时间']
df = pd.DataFrame(columns=form_header)
df.to_excel(excel_file_name, index=False)
# 写入数据到 EXCEL
def add_info_to_form(excel_file_name, data=[]):
df = pd.read_excel(excel_file_name)
row_index = len(df) + 1 # 当前excel内容有几行
df.loc[row_index] = data
df.to_excel(excel_file_name, index=False)
# 未搜索到关键词写入到文件
def error_to_txt(_txt):
fp = open(_error_file_name, 'a+', encoding='utf8')
fp.write(_txt+"\n")
fp.close()
def worker(i):
while True:
item = q.get()
if item is None:
print("线程%s: 消息队列发现了一个None,可以休息了^-^" % i)
break
# do_work(item)做具体的工作
time.sleep(random.randint(0, int(_read_time)))
# 获取 cookies
_cookie = get_cookie()
# 搜索关键词
_res = response(word=str(item.replace(" ", "")),Cookie=_cookie['Cookie'], Cipher_Text=_cookie['Cipher_Text'])
# 如果cookie是否错误或者是否登录
if _res['status'] == 10018 or _res['status'] == 10000:
break
# 判断返回数据
if _res['status'] == 10002:
print("线程%s: 百度指数搜索 NOTFOUND <%s>" % (i, item))
error_to_txt(item)
try:
if _res['status'] == 0:
data = get_data(_res['data'])
add_info_to_form(_to_file_name, data)
print("线程%s: 百度指数搜索 SUCCESS <%s>" % (i, item))
except Exception as e:
print("线程%s: 百度指数搜索 ERROR <%s> " % (i, item))
error_to_txt(item)
# 做完后发出任务完成信号,然后继续下一个任务
q.task_done()
# 读取 txt 文件,返回文件内的所有数据,为任务队列准备
def read_filename(fromFileName):
_source = []
with open(fromFileName, 'r', encoding='utf-8') as file:
_source = file.read().splitlines()
file.close()
print('关键字数量: <%s>' % len(_source))
return _source
def main():
print('='*60)
print('\n')
print('正在启动......')
create_form(_to_file_name)
print('开始读取: %s' % _from_file_name)
_source = read_filename(fromFileName=_from_file_name)
# 多线程运行 worker 函数,并把他们添加到线程池里
for i in range(1, num_of_threads+1):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 每隔0.5秒发布一个新任务
for item in _source:
time.sleep(0.1)
q.put(item)
q.join()
print("-----搜索都完成了-----")
# 停止工作线程
for i in range(num_of_threads):
q.put(None)
for t in threads:
t.join()
# 获取代理
def get_proxy():
proxy_ip = random.choice(_proxies)
# 随机代理
proxies = {'http': proxy_ip, 'https': proxy_ip}
return proxies
# 测试使用
def get_cookie():
_index = random.randint(0, len(_global) - 1) # 获取变量数量, 并随机获取,列表下标从0开始,则减1
_keys = list(_global.keys()) # 获取变量 key,并组合成列表
_use_key = _keys[_index] # 随机获取一个 key
_cookie = _global[_use_key] # 通过随机 key 获取对应 cookie
return _cookie
if __name__ == "__main__":
main()