自动更新白名单IP
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

133 lines
3.6 KiB

import logging
import re
import time
from json import dumps
import requests
try:
from urllib import urlencode, unquote
from urlparse import urlparse, parse_qsl, ParseResult
except ImportError:
# Python 3 fallback
from urllib.parse import (
urlencode, unquote, urlparse, parse_qsl, ParseResult
)
_IPV4 = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
def init_logging(name, level=logging.INFO, filename=None):
logger = logging.getLogger(name)
logger.setLevel(level)
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
if filename:
fh = logging.FileHandler(filename, "a+")
fh.setLevel(level)
fh.setFormatter(formatter)
logger.addHandler(fh)
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
def add_url_params(url, params):
""" Add GET params to provided URL being aware of existing.
:param url: string of target URL
:param params: dict containing requested params to be added
:return: string with updated URL
>> url = 'http://stackoverflow.com/test?answers=true'
>> new_params = {'answers': False, 'data': ['some','values']}
>> add_url_params(url, new_params)
'http://stackoverflow.com/test?data=some&data=values&answers=false'
"""
# Unquoting URL first so we don't loose existing args
url = unquote(url)
# Extracting url info
parsed_url = urlparse(url)
# Extracting URL arguments from parsed URL
get_args = parsed_url.query
# Converting URL arguments to dict
parsed_get_args = dict(parse_qsl(get_args))
# Merging URL arguments dict with new params
parsed_get_args.update(params)
# Bool and Dict values should be converted to json-friendly values
# you may throw this part away if you don't like it :)
parsed_get_args.update(
{k: dumps(v) for k, v in parsed_get_args.items()
if isinstance(v, (bool, dict))}
)
# Converting URL argument to proper search string
encoded_get_args = urlencode(parsed_get_args, doseq=True)
# Creating new parsed result object based on provided with new
# URL arguments. Same thing happens inside of urlparse.
new_url = ParseResult(
parsed_url.scheme, parsed_url.netloc, parsed_url.path,
parsed_url.params, encoded_get_args, parsed_url.fragment
).geturl()
return new_url
def validate_ipv4(ip):
if ip is None:
return False
return _IPV4.match(ip)
logger = init_logging(name="renew", filename="log.txt")
def get_ip():
r = requests.get("http://current-ip.16yun.cn:802")
r.close()
if r.ok and validate_ipv4(r.text):
logger.info("get public ip {}".format(r.text))
return r.text
def renew_ip(ip, url, index):
url = add_url_params(url, {"index": index, "newip": ip})
logger.info("url {}".format(url))
r = requests.get(url)
r.close()
if r.ok:
logger.info("renew ip {}".format(ip))
return True
else:
logger.error(r.text)
def main(url, index, wait):
old_ip = None
while True:
try:
ip = get_ip()
if ip != old_ip:
renew_ip(ip=ip, url=url, index=index)
old_ip = ip
except Exception as err:
logger.error(err)
time.sleep(wait)
if __name__ == "__main__":
import ConfigParser
config = ConfigParser.ConfigParser()
config.readfp(open('config.cfg'))
url = config.get("main", "url")
index = config.getint("main", "index")
wait = config.getint("main", "wait")
logger.info("index {}".format(index))
main(url=url, index=index, wait=wait)