concurrent和requests的那些坑

本文最后更新于:2020年9月16日 上午

摘要

这篇文章记录了一下python concurrent.future和requests的一些坑,不是程序猿的铁子们直接关掉吧。

起因

最近在写一个爬虫,需要爬5000000个IP,待爬的IP以IP,PORT的形式存储在一个100Mb的文件中。这么粗略算一下,一秒钟一个IP,5000000个IP需要差不多57天,这肯定不行啊,得用多线程。但是使用的过程中,发现还是遇到了很多坑,这里分享出来,以免其他小伙伴踩我踩过的坑吧。

先说结论

节约大家时间,先说结论吧!我遇到了以下几个坑:

  • 不要一次将大文件的内容读到一个列表中。5000000个IP,虽然文件看起来只有100Mb,但程序直接怼到列表中,内存爆了。尽量读一行处理一行,即使用以下的形式读文件:

    1
    2
    3
    with open("file") as f:
    for f in line:
    print(f)
  • 使用多线程时,处理函数一定一定一定一定不要阻塞,concurrent.futures只有当所有子线程运行结束后,才会释放资源

  • requests库在处理某些请求时会阻塞卡死。上面刚说多线程不要阻塞,这里就阻塞了。实际上是这次我爬取的过程中,发现了部分奇葩的IP,使用requests的timeout参数对它们无效,会始终阻塞。

  • 可以了解一下eventlet库的Timeout。如果实在无法解决某些处理流程中(比如上面requests)的阻塞,建议使用eventlet库的Timeout。

曲折历程

读IP干爆内存

一开始要做这个爬虫的时候,我想要用多线程,自然而然想到了concurrent.futures,参考说明文档的示例,我写了第一版程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import concurrent.futures
import requests


def get_url(ip, port):
try:
r = requests.get("http://{}:{}/path".foramt(ip, port), timeout=3)
return r.text
except:
return ""


def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
with open("123.txt") as f:
# submit所有IP
future_dict = {executor.submit(get_url, line.strip().split(",")[0], line.strip().split(",")[1]): line.strip().split(",") for line in f}
print(len(future_dict))

# 等待所有线程执行完毕
for future in concurrent.futures.as_completed(future_dict):
ip_port = future_dict[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (ip_port, exc))
else:
# print(data)
pass

if __name__ == "__main__":
main()

我当时发现,内存直接被干爆了。为什么?其实程序中的18行已经有答案了。concurrent.future官方的示例中,会将所有future对象存到一个字典中,然后再等待所有线程执行完毕。虽然用了for line in f这种形式迭代,但5000000的future对象,还是太大了。

那咋办嘛?就控制future_dict的长度呗,规定一个长度,执行完毕直接释放内存,所以我使用了Queue,保证传入future_dict的内容大小始终可控,第二版代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import requests
import concurrent.futures
import queue
import json
from time import ctime

ip_queue = queue.Queue(100)


def get_url(ip, port):
try:

r = requests.get("http://{}:{}/path_info".format(ip, port),
timeout=5)
return r.text
except Exception as e:
return e


def main():
future_dict = {}
with open("./ipfile") as f:
for line in f:
if not ip_queue.full():
ip_queue.put(line.strip().split(","))
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
while not ip_queue.empty():
try:
ip_port = ip_queue.get(block=False)
future_dict[executor.submit(get_url, ip_port[0], ip_port[1])] = ip_port
except Exception as e:
print(e)

for future in concurrent.futures.as_completed(future_dict, timeout=30):
ip_port = future_dict[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (ip_port, exc))
else:
print(data)
future_dict.clear()

print("finish")


if __name__ == "__main__":
main()

跑着跑着就被系统kill了

就像预期的,上面第二版的程序在刚开始运行的时候可以正常跑,但是跑一段时间后,会没有任何提示,直接被系统kill掉。为什么呢?通过htop监控程序运行过程中的状态,我发现内存会明显随着运行时间的边长也增大。

既然使用了Queue,程序中该清理释放内存的地方(比如future_dict都做了相应的的处理),那为什么内存还会出问题呢?其实,有两点:**concurrent.future的内存释放机制和卡死的requests请求。**

concurrent.future的内存释放机制

第二版有一个致命的地方:我们使用了with的方式开了一个Executor,第35行虽然设置了超时,但是concurrent.future只会等所有子线程执行完成后才会return并释放内存,可以仔细读一下下图的相关说明。

concurrent内存清理机制

第二版程序的35行设置了超时,所以程序执行的逻辑是:ip_queue第一次满之后,开一个executor,对ip_queue中的ip进行处理,但由于一些未知的异常,导致某一些子线程没有在30秒的时间内执行完,这时主线程会报超时的错误,不等第一个executor所有的子线程执行完,释放内存,就直接进入下一轮。如果第一轮的子线程能处理好所有异常,在之后退出,那么不会有内存问题,因为虽然提前进入了第二轮,某个时段存在两个甚至多个executor,但前面的executor总会退出的,内存总会释放的,对吧。

但事实和我们想的不一样,子线程实际上是在执行get_url()函数,若get_url()函数本身逻辑有问题,没有办法处理所有的异常被挂起,那么就会有相当的executor挂着,不能释放,当内存占用到一定程度,操作系统自然就会kill掉我们的程序了。

requests卡死?挂起?

正常来说,我们第二版的代码里面,requests是设了超时的,但为什么get_url()函数就卡死了呢?其实部分IP,可能它返回的内容非常大,requests默认的get,会将其临时保存在内存中,那假如我们爬虫同时遇到10个返回100Mb内容的IP,1个Gb的内容,1M的带宽,算算要多久才能下完?当然就卡住了。带宽越大,下得越快,线程越多,内存炸的也就更快。

咋办呢?其实requests提供了流的模式,使用方式参照官方的文档吧,这里贴一个官方例子:

1
2
3
4
5
6
7
8
9
10
11
import json
import requests

r = requests.get('https://httpbin.org/stream/20', stream=True)

for line in r.iter_lines():

# filter out keep-alive new lines
if line:
decoded_line = line.decode('utf-8')
print(json.loads(decoded_line))

根据需求改动下,实时写入文件就行了。

设置子线程的超时异常?

上面分析了被系统kill的原因,实际上还是内存炸了,所以修改了代码,第三版程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import requests
import concurrent.futures
import queue
import json
from time import ctime

ip_queue = queue.Queue(100)


def get_url(ip, port):
try:

r = requests.get("http://{}:{}/path_info".format(ip, port),
timeout=5, stream=True)
return next(r.iter_lines())
except Exception as e:
return e


def main():
future_dict = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
with open("./ipfile") as f:
for line in f:
if not ip_queue.full():
ip_queue.put(line.strip().split(","))
else:
while not ip_queue.empty():
try:
ip_port = ip_queue.get(block=False)
future_dict[executor.submit(get_url, ip_port[0], ip_port[1])] = ip_port
except Exception as e:
print(e)

for future in concurrent.futures.as_completed(future_dict, timeout=30):
ip_port = future_dict[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (ip_port, exc))
else:
print(data)
future_dict.clear()

print("finish")


if __name__ == "__main__":
main()

结果呢,还是没能如愿,程序依然会出现卡死的现象。后来我分析原因,是部分IP不太正常,会一直返回内容,类似摄像头,所以永远不可能接收完内容,那这个子线程就一直挂着了。咋办?我就想,能不能设置子线程的超时,后来发现了一个库:eventlet

所以最终,第四版程序的样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import requests
from requests.auth import HTTPBasicAuth
import concurrent.futures
import queue
import json
from time import ctime
import eventlet

eventlet.monkey_patch()

ip_queue = queue.Queue(200)


def get_url(ip, port):
with eventlet.Timeout(20):
with requests.Session() as s:
try:
r = s.get("http://{}:{}/path_info".format(ip, port),
timeout=(3, 3))
s.close()
return r.text
except eventlet.timeout.Timeout as e:
print("eventlet timeout:", e)
s.close()
return ""
except Exception as e:
s.close()
return ""


def multithread_attach():
future_dict = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
ip_port = ""
while not ip_queue.empty():
try:
ip_port = ip_queue.get(block=False)
# print(ip_port)
future_dict[executor.submit(attach_tools, ip_port[0], ip_port[1])] = ip_port
except Exception as e:
print(e)
print("time:{}".format(ctime()), "200", "start attach")
print("last ip:", ip_port)

for future in concurrent.futures.as_completed(future_dict):
ip_port = future_dict[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (ip_port, exc))
else:
pass
print("time:{}".format(ctime()), "200", "finish attach")
print("last ip:", ip_port, "\n")
del future_dict


def main():
with open("./ipfile") as f:
for line in f:
if not ip_queue.full():
ip_queue.put(line.strip().split(","))
else:
multithread_attach()

multithread_attach()
print("finish")


if __name__ == "__main__":
main()

最后

其实,收获最大的是concurrent.future的内存处理机制吧,这个坑肯定是不踩不知道的,python好像主动停止正在运行的子线程也比较麻烦,所以eventlet这个库,在某些场景给子线程设置超时也是一种挺好的方式。