Browse Source

update tester

pull/4/merge
Germey 7 years ago
parent
commit
5a17ac447a
  1. 1
      cookiespool/api.py
  2. 7
      cookiespool/generator.py
  3. 10
      cookiespool/scheduler.py
  4. 36
      cookiespool/tester.py
  5. 55
      login/weibo/cookies.py

1
cookiespool/api.py

@ -12,7 +12,6 @@ app = Flask(__name__)
def index():
return '<h2>Welcome to Cookie Pool System</h2>'
def get_conn():
"""
获取

7
cookiespool/generator.py

@ -1,11 +1,6 @@
import json
import requests
from selenium import webdriver
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium.webdriver import DesiredCapabilities
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from cookiespool.config import *
from cookiespool.db import RedisClient
from login.weibo.cookies import WeiboCookies
@ -85,6 +80,8 @@ class CookiesGenerator(object):
print('成功删除账号')
else:
print(result.get('content'))
else:
print('所有账号都已经成功获取Cookies')
def close(self):
"""

10
cookiespool/scheduler.py

@ -10,12 +10,12 @@ class Scheduler(object):
@staticmethod
def valid_cookie(cycle=CYCLE):
while True:
print('Checking Cookies')
print('Cookies检测进程开始运行')
try:
for name, cls in TESTER_MAP.items():
tester = eval(cls + '(name="' + name + '")')
tester.run()
print('Tester Finished')
print('Cookies检测完成')
del tester
time.sleep(cycle)
except Exception as e:
@ -24,20 +24,20 @@ class Scheduler(object):
@staticmethod
def generate_cookie(cycle=CYCLE):
while True:
print('Generating Cookies')
print('Cookies生成进程开始运行')
try:
for name, cls in GENERATOR_MAP.items():
generator = eval(cls + '(name="' + name + '")')
generator.run()
print('Generator Finished')
print('Cookies生成完成')
generator.close()
print('Deleted Generator')
time.sleep(cycle)
except Exception as e:
print(e.args)
@staticmethod
def api():
print('API接口开始运行')
app.run(host=API_HOST, port=API_PORT)
def run(self):

36
cookiespool/tester.py

@ -10,42 +10,38 @@ class ValidTester(object):
self.cookies_db = RedisClient('cookies', self.name)
self.account_db = RedisClient('accounts', self.name)
def test(self, account, cookies):
def test(self, username, cookies):
raise NotImplementedError
def run(self):
accounts = self.cookies_db.all()
print(accounts)
for account in accounts:
username = account.get('username')
cookies = self.cookies_db.get(username)
self.test(account, cookies)
cookies_groups = self.cookies_db.all()
for username, cookies in cookies_groups.items():
self.test(username, cookies)
class WeiboValidTester(ValidTester):
def __init__(self, name='weibo'):
ValidTester.__init__(self, name)
def test(self, account, cookies):
print('Testing Account', account.get('username'))
def test(self, username, cookies):
print('正在测试Cookies', '用户名', username)
try:
cookies = json.loads(cookies)
except TypeError:
print('Invalid Cookies Value', account.get('username'))
self.cookies_db.delete(account.get('username'))
print('Deleted User', account.get('username'))
return None
print('Cookies不合法', username)
self.cookies_db.delete(username)
print('删除Cookies', username)
return
try:
test_url = TEST_URL_MAP[self.name]
response = requests.get(test_url, cookies=cookies, timeout=5, allow_redirects=False)
if response.status_code == 200:
print('Valid Cookies', account.get('username'))
print('Cookies有效', username)
print('部分测试结果', response.text[0:50])
else:
print(response.status_code, response.headers)
print('Invalid Cookies', account.get('username'))
self.cookies_db.delete(account.get('username'))
print('Deleted User', account.get('username'))
print('Cookies失效', username)
self.cookies_db.delete(username)
print('删除Cookies', username)
except ConnectionError as e:
print('Error', e.args)
print('Invalid Cookies', account.get('username'))
print('发生异常', e.args)

55
login/weibo/cookies.py

@ -1,8 +1,6 @@
import os
import time
from io import BytesIO
from PIL import Image
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By
@ -158,31 +156,34 @@ class WeiboCookies():
:return:
"""
# 获得四个按点
circles = self.browser.find_elements_by_css_selector('.patt-wrap .patt-circ')
dx = dy = 0
for index in range(4):
circle = circles[numbers[index] - 1]
# 如果是第一次循环
if index == 0:
# 点击第一个按点
ActionChains(self.browser) \
.move_to_element_with_offset(circle, circle.size['width'] / 2, circle.size['height'] / 2) \
.click_and_hold().perform()
else:
# 小幅移动次数
times = 30
# 拖动
for i in range(times):
ActionChains(self.browser).move_by_offset(dx / times, dy / times).perform()
time.sleep(1 / times)
# 如果是最后一次循环
if index == 3:
# 松开鼠标
ActionChains(self.browser).release().perform()
else:
# 计算下一次偏移
dx = circles[numbers[index + 1] - 1].location['x'] - circle.location['x']
dy = circles[numbers[index + 1] - 1].location['y'] - circle.location['y']
try:
circles = self.browser.find_elements_by_css_selector('.patt-wrap .patt-circ')
dx = dy = 0
for index in range(4):
circle = circles[numbers[index] - 1]
# 如果是第一次循环
if index == 0:
# 点击第一个按点
ActionChains(self.browser) \
.move_to_element_with_offset(circle, circle.size['width'] / 2, circle.size['height'] / 2) \
.click_and_hold().perform()
else:
# 小幅移动次数
times = 30
# 拖动
for i in range(times):
ActionChains(self.browser).move_by_offset(dx / times, dy / times).perform()
time.sleep(1 / times)
# 如果是最后一次循环
if index == 3:
# 松开鼠标
ActionChains(self.browser).release().perform()
else:
# 计算下一次偏移
dx = circles[numbers[index + 1] - 1].location['x'] - circle.location['x']
dy = circles[numbers[index + 1] - 1].location['y'] - circle.location['y']
except:
return False
def get_cookies(self):
"""

Loading…
Cancel
Save