class CrawlThreadPool(object):
'''
启用最大并发线程数为5的线程池进行URL链接爬取及结果解析;
最终通过crawl方法的complete_callback参数进行爬取解析结果回调
'''
def __init__(self):
self.thread_pool = ThreadPoolExecutor(max_workers=5)
def _request_parse_runnable(self, addr):
try:
param={
'apikey':'xxxxxxx',
'module':'account',
'address':addr
}
response= requests.get('<http://api.xxxxx.io/api>', params=param)
data_json = json.loads(response.text)
if data_json['message'] == 'OK':
data = response.text
else:
print(' 状态值:' + data_json['message'])
data = None
except BaseException as e:
print(str(e))
data = None
return data
def crawl(self, addr, complete_callback):
future = self.thread_pool.submit(self._request_parse_runnable, addr)
future.add_done_callback(complete_callback)
class OutPutThreadPool(object):
'''
启用最大并发线程数为5的线程池对上面爬取解析线程池结果进行并发处理存储;
'''
def __init__(self):
self.thread_pool = ThreadPoolExecutor(max_workers=5)
def _output_runnable(self, crawl_result, addr):
try:
if crawl_result is None:
return
output_filename = os.path.join(".", "data", addr + ".json")
with open(output_filename, 'a') as output_file:
output_file.write(crawl_result)
output_file.write('\\n')
print('saved success!')
except Exception as e:
print('save file error. ->'+str(e))
def save(self, crawl_result, ADDRESSdir):
self.thread_pool.submit(self._output_runnable, crawl_result, ADDRESSdir)
class CrawlManager(object):
'''
爬虫管理类,负责管理爬取解析线程池及存储线程池
'''
def __init__(self, addr):
self.crawl_pool = CrawlThreadPool()
self.output_pool = OutPutThreadPool()
self.addr = addr
def _crawl_future_callback(self, crawl_url_future):
try:
data = crawl_url_future.result()
self.output_pool.save(data, self.addr)
except Exception as e:
print('Run crawl url future thread error. '+str(e))
def start_runner(self):
for startblock in range(5000000, 6000000, 10000):# 块在[5000000, 6000000]之间
self.crawl_pool.crawl(self.addr, startblock, self._crawl_future_callback)
if __name__ == '__main__':
CrawlManager(addr).start_runner()