defmain(): with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: with open("123.txt") as f: # submit所有IP future_dict = {executor.submit(get_url, line.strip().split(",")[0], line.strip().split(",")[1]): line.strip().split(",") for line in f} print(len(future_dict))
# 等待所有线程执行完毕 for future in concurrent.futures.as_completed(future_dict): ip_port = future_dict[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (ip_port, exc)) else: # print(data) pass
if __name__ == "__main__": main()
我当时发现,内存直接被干爆了。为什么?其实程序中的18行已经有答案了。concurrent.future官方的示例中,会将所有future对象存到一个字典中,然后再等待所有线程执行完毕。虽然用了for line in f这种形式迭代,但5000000的future对象,还是太大了。
import requests import concurrent.futures import queue import json from time import ctime
ip_queue = queue.Queue(100)
defget_url(ip, port): try:
r = requests.get("http://{}:{}/path_info".format(ip, port), timeout=5) return r.text except Exception as e: return e
defmain(): future_dict = {} with open("./ipfile") as f: for line in f: ifnot ip_queue.full(): ip_queue.put(line.strip().split(",")) else: with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: whilenot ip_queue.empty(): try: ip_port = ip_queue.get(block=False) future_dict[executor.submit(get_url, ip_port[0], ip_port[1])] = ip_port except Exception as e: print(e)
for future in concurrent.futures.as_completed(future_dict, timeout=30): ip_port = future_dict[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (ip_port, exc)) else: print(data) future_dict.clear()
import requests import concurrent.futures import queue import json from time import ctime
ip_queue = queue.Queue(100)
defget_url(ip, port): try:
r = requests.get("http://{}:{}/path_info".format(ip, port), timeout=5, stream=True) return next(r.iter_lines()) except Exception as e: return e
defmain(): future_dict = {} with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: with open("./ipfile") as f: for line in f: ifnot ip_queue.full(): ip_queue.put(line.strip().split(",")) else: whilenot ip_queue.empty(): try: ip_port = ip_queue.get(block=False) future_dict[executor.submit(get_url, ip_port[0], ip_port[1])] = ip_port except Exception as e: print(e)
for future in concurrent.futures.as_completed(future_dict, timeout=30): ip_port = future_dict[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (ip_port, exc)) else: print(data) future_dict.clear()