我正在参与「掘金启航计划」
先放定论
id | 线程/进程/线程行列/进程行列 | 10W数据进行列时刻 | 10W数据出行列时刻 | 100W数据进行列时刻 | 100W数据出行列时刻 | 成果剖析 |
---|---|---|---|---|---|---|
1 | collections.deque |
180ms | 380ms | 190ms | 376ms | 功能:高 |
2 | multiprocessing.Queue |
300ms | 12s | 1,92s | 2m15s | 功能:一般。剖析:进程行列为保证数据一致性,在操作数据的时分会有锁的操作 |
3 | 组合运用1:mq供给数据给collections.deque运用 | 300ms | 11s | 3.051s | 2m26s | 功能:一般。剖析:成果跟直接运用进程行列相近,耗时主要发生在加锁释放锁上面 |
4 | 组合运用2:一个进程作为出产者出产3个multiprocessing.Queue ,三个线程消费进程行列数据,存储到collections.deque
|
mq=300ms dq=50条/s | mq=50条/s dq=50条/s | mq=3.0s dq=50条/s | mq=dq=50条/s | 功能:差。剖析:在线程中消费进程行列中的数据,可能会由于锁的抢占带来功能问题。100W条数据mq入=6s, MQ抢占时段:mq出=dq入=dq出 = 50条/s, MQ非抢占时段:mq出=dq入=dq出 = 100W/50s。 |
1 collections.deque
功能测验
1.1 test_deque.py
from collections import deque
from multiprocessing import Queue
import datetime
data = "1"*(2**15)
MSG_QUEUE = deque()
someip_queue = Queue()
print("开端进程行列收数据")
for i in range(0, 10 ** 5):
someip_queue.put(data)
if i % 10000 == 0:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(f"{time_1} 接纳数据 {i}")
print("开端解析数据")
for i in range(0, 10 ** 5):
if someip_queue.empty():
print(i)
break
MSG_QUEUE.append(someip_queue.get())
if i % 10000 == 0:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(f"{time_1} 解析数据 {i}")
print("开端匹配数据")
for i in range(0, 10 ** 5):
if MSG_QUEUE.__len__() == 0:
print(i)
break
a = MSG_QUEUE.popleft()
if i % 10000 == 0:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(f"{time_1} 匹配数据 {i}")
print("程序完毕")
1.2 result1 处理10W条数据
python test_deque.py
开端解析数据
13:44:49.648213 解析数据 0
13:44:49.668215 解析数据 100000
13:44:49.687216 解析数据 200000
13:44:49.709217 解析数据 300000
13:44:49.731219 解析数据 400000
13:44:49.751221 解析数据 500000
13:44:49.769222 解析数据 600000
13:44:49.788223 解析数据 700000
13:44:49.807224 解析数据 800000
13:44:49.825226 解析数据 900000
开端匹配数据
13:44:49.843228 匹配数据 0
13:44:49.882230 匹配数据 100000
13:44:49.924233 匹配数据 200000
13:44:49.964237 匹配数据 300000
13:44:50.006240 匹配数据 400000
13:44:50.046971 匹配数据 500000
13:44:50.094976 匹配数据 600000
13:44:50.142980 匹配数据 700000
13:44:50.190983 匹配数据 800000
13:44:50.233984 匹配数据 900000
程序完毕
Process finished with exit code 0
1.3 result 处理100w条数据
开端解析数据
14:08:11.606508 解析数据 0
14:08:11.629508 解析数据 100000
14:08:11.649510 解析数据 200000
14:08:11.671510 解析数据 300000
14:08:11.693514 解析数据 400000
14:08:11.716168 解析数据 500000
14:08:11.736817 解析数据 600000
14:08:11.756819 解析数据 700000
14:08:11.776820 解析数据 800000
14:08:11.796821 解析数据 900000
开端匹配数据
14:08:11.817823 匹配数据 0
14:08:11.856826 匹配数据 100000
14:08:11.904830 匹配数据 200000
14:08:11.944832 匹配数据 300000
14:08:11.982834 匹配数据 400000
14:08:12.023837 匹配数据 500000
14:08:12.065841 匹配数据 600000
14:08:12.108842 匹配数据 700000
14:08:12.152150 匹配数据 800000
14:08:12.193153 匹配数据 900000
程序完毕
Process finished with exit code 0
1.4 成果剖析
- collections.deque 10W条数据进行列耗时=180ms,出行列耗时=380ms;
- collections.deque 100W条数据进行列耗时=190ms,出行列耗时=376ms;
2 multiprocessing.Queue
功能测验
2.1 test_mq.py
import time
from multiprocessing import Queue
import datetime
data = "1"*(2**15)
someip_queue = Queue()
print("开端进程行列收数据")
for i in range(0, 10 ** 5):
someip_queue.put(data)
if i % 10000 == 0:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(f"{time_1} 接纳数据 {i}")
print("开端解析数据")
i = 0
while 1:
if someip_queue.empty():
print(i)
break
i += 1
someip_queue.get()
if i % 10000 == 0:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(f"{time_1} 解析数据 {i}")
print("程序完毕")
2.2 result
开端进程行列收数据
14:00:20.832107 接纳数据 0
14:00:20.863109 接纳数据 10000
14:00:20.896623 接纳数据 20000
14:00:20.930625 接纳数据 30000
14:00:20.964626 接纳数据 40000
14:00:21.001628 接纳数据 50000
14:00:21.036631 接纳数据 60000
14:00:21.072634 接纳数据 70000
14:00:21.104637 接纳数据 80000
14:00:21.135639 接纳数据 90000
开端解析数据
14:00:22.389109 解析数据 10000
14:00:23.551193 解析数据 20000
14:00:24.748545 解析数据 30000
14:00:25.923965 解析数据 40000
42782
程序完毕
2.3 test_mq_upd.py
上面的处理发现someip_queue.empty()会偶发的检测不到行列中还存在数据,导致解析数据循环会终止,更新一下处理逻辑
import time
from multiprocessing import Queue
import datetime
data = "1"*(2**15)
someip_queue = Queue()
print("开端进程行列收数据")
for i in range(0, 10 ** 5):
someip_queue.put(data)
if i % 10000 == 0:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(f"{time_1} 接纳数据 {i}")
print("开端解析数据")
i = 0
while 1:
if someip_queue.empty():
print(i)
time.sleep(0.1) # 针对偶发性someip_queue.empty()判别行列数据为空的状况处理
if someip_queue.empty():
break
i += 1
someip_queue.get()
if i % 10000 == 0:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(f"{time_1} 解析数据 {i}")
print("程序完毕")
2.4 result 10w条数据
开端进程行列收数据
14:03:14.764536 接纳数据 0
14:03:14.796539 接纳数据 10000
14:03:14.828540 接纳数据 20000
14:03:14.859544 接纳数据 30000
14:03:14.891545 接纳数据 40000
14:03:14.923548 接纳数据 50000
14:03:14.953550 接纳数据 60000
14:03:14.985553 接纳数据 70000
14:03:15.026117 接纳数据 80000
14:03:15.063119 接纳数据 90000
开端解析数据
14:03:16.357918 解析数据 10000
14:03:17.537253 解析数据 20000
14:03:18.691518 解析数据 30000
35600
14:03:20.079642 解析数据 40000
14:03:21.302501 解析数据 50000
14:03:22.536545 解析数据 60000
14:03:23.753875 解析数据 70000
14:03:25.119643 解析数据 80000
14:03:26.730257 解析数据 90000
14:03:28.397706 解析数据 100000
100000
程序完毕
Process finished with exit code 0
2.5 result 100w条数据
开端进程行列收数据
14:07:58.701345 接纳数据 0
14:07:59.027795 接纳数据 100000
14:07:59.376901 接纳数据 200000
14:07:59.688924 接纳数据 300000
14:08:00.003947 接纳数据 400000
14:08:00.307593 接纳数据 500000
14:08:00.629275 接纳数据 600000
14:08:00.943386 接纳数据 700000
14:08:01.258410 接纳数据 800000
14:08:01.587435 接纳数据 900000
开端解析数据
12154
83325
14:08:14.096339 解析数据 100000
14:08:26.397118 解析数据 200000
253072
14:08:38.504043 解析数据 300000
319820
385967
396633
14:08:53.691992 解析数据 400000
14:09:10.020712 解析数据 500000
14:09:26.105443 解析数据 600000
14:09:42.529058 解析数据 700000
14:09:58.662050 解析数据 800000
14:10:14.955844 解析数据 900000
926888
14:10:31.449206 解析数据 1000000
1000000
程序完毕
Process finished with exit code 0
2.6 成果剖析
- multiprocessing.Queue行列10W条数据增加数据时长=300ms,出行列时刻= 12s。
- multiprocessing.Queue行列100W条数据增加数据时长=1.92s,出行列时刻 =2m15s。
3 组合运用
- 组合运用1:mq供给数据给collections.deque运用
3.1 test_mq_to_deque.py
import time
from collections import deque
from multiprocessing import Queue
import datetime
data = "1"*(2**15)
MSG_QUEUE = deque()
someip_queue = Queue()
print("开端进程行列收数据")
for i in range(0, 10 ** 5):
someip_queue.put(data)
if i % 10000 == 0:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(f"{time_1} 接纳数据 {i}")
print("开端解析数据")
i = 0
# for i in range(0, 10 ** 5):
while 1:
if someip_queue.empty():
print(i)
time.sleep(0.1)
if someip_queue.empty():
break
i += 1
MSG_QUEUE.append(someip_queue.get())
if i % 10000 == 0:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(f"{time_1} 解析数据 {i}")
print("开端匹配数据")
for i in range(0, 10 ** 5):
if MSG_QUEUE.__len__() == 0:
print(i)
break
a = MSG_QUEUE.popleft()
if i % 10000 == 0:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
print(f"{time_1} 匹配数据 {i}")
print("程序完毕")
3.2 result 10W条数据
开端进程行列收数据
14:22:56.533544 接纳数据 0
14:22:56.568546 接纳数据 10000
14:22:56.603547 接纳数据 20000
14:22:56.635549 接纳数据 30000
14:22:56.664551 接纳数据 40000
14:22:56.694553 接纳数据 50000
14:22:56.723555 接纳数据 60000
14:22:56.754559 接纳数据 70000
14:22:56.792563 接纳数据 80000
14:22:56.831624 接纳数据 90000
开端解析数据
14:22:58.138838 解析数据 10000
14:22:59.303157 解析数据 20000
14:23:00.470027 解析数据 30000
14:23:01.631228 解析数据 40000
14:23:02.802825 解析数据 50000
14:23:03.968407 解析数据 60000
14:23:05.145760 解析数据 70000
14:23:06.338492 解析数据 80000
14:23:07.573767 解析数据 90000
97402
14:23:08.997958 解析数据 100000
100000
开端匹配数据
14:23:09.097974 匹配数据 0
14:23:09.103976 匹配数据 10000
14:23:09.109976 匹配数据 20000
14:23:09.115975 匹配数据 30000
14:23:09.121977 匹配数据 40000
14:23:09.127977 匹配数据 50000
14:23:09.133978 匹配数据 60000
14:23:09.139978 匹配数据 70000
14:23:09.145978 匹配数据 80000
14:23:09.151979 匹配数据 90000
程序完毕
Process finished with exit code 0
3.3 result 100W 条数据
D:\GITEE\ling-shu\venv\Scripts\python.exe D:/EPT_GIT/ling-shu/deque_test/test_21.py
开端进程行列收数据
14:27:25.206307 接纳数据 0
14:27:25.590652 接纳数据 100000
14:27:25.949772 接纳数据 200000
14:27:26.261963 接纳数据 300000
14:27:26.599625 接纳数据 400000
14:27:26.908648 接纳数据 500000
14:27:27.231650 接纳数据 600000
14:27:27.579674 接纳数据 700000
14:27:27.925000 接纳数据 800000
14:27:28.257025 接纳数据 900000
开端解析数据
18339
14:27:40.513303 解析数据 100000
132049
139293
178448
14:27:54.578327 解析数据 200000
234886
235156
235188
14:28:11.252202 解析数据 300000
322028
322050
322125
14:28:27.853079 解析数据 400000
14:28:43.420115 解析数据 500000
514710
515523
14:29:00.502026 解析数据 600000
632704
690074
14:29:16.253023 解析数据 700000
14:29:32.181067 解析数据 800000
839646
855881
14:29:49.207849 解析数据 900000
927798
14:30:06.034237 解析数据 1000000
1000000
开端匹配数据
14:30:06.134542 匹配数据 0
程序完毕
Process finished with exit code 0
- 组合运用2:一个进程作为出产者出产3个
multiprocessing.Queue
,三个线程消费进程行列数据,存储到collections.deque
3.4 test_mq_to_dq.py
import datetime
import multiprocessing
import threading
import time
from collections import deque
mq1 = multiprocessing.Queue()
dq1 = deque()
def th1(mq1, dq1):
time.sleep(2)
print("开端解析dq1......dq count 0")
while 1:
if mq1.empty():
continue
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
a = mq1.get()
dq1.append(a)
dc1 = dq1.__len__()
if dc1 % 100000 == 0:
print(f"{time_1} dq1 count: {dc1}")
if dc1 <= 1000:
if dc1 % 100 == 0:
print(f"{time_1} dq1 count: {dc1}")
def mq_4(mq1):
j = 0
c = "*" * 10000
while 1:
time_1 = datetime.datetime.now().strftime('%H:%M:%S.%f')
mq1.put(c)
if j % 100000 == 0:
print(time_1, j, "*" * 20)
if j == 1000000:
break
j += 1
# time.sleep(0.001)
if __name__ == '__main__':
threading.Thread(target=th1, args=(mq1, dq1)).start()
multiprocessing.Process(target=mq_4, args=(mq1,)).start()
- result
D:\VENV_venv\Scripts\python.exe D:/GIT/xxxxx.py
11:08:34.639912 0 ********************
11:08:35.257405 100000 ********************
11:08:35.855758 200000 ********************
11:08:36.435938 300000 ********************
开端解析dq1......dq count 0
11:08:37.055326 400000 ********************
11:08:37.690540 500000 ********************
11:08:38.307818 dq1 count: 100
11:08:38.316020 600000 ********************
11:08:38.950870 700000 ********************
11:08:39.593406 800000 ********************
11:08:40.200279 900000 ********************
11:08:40.329467 dq1 count: 200
11:08:40.799896 1000000 ********************
11:08:40.803896 dq1 count: 300
11:08:40.807895 dq1 count: 400
11:08:40.810895 dq1 count: 500
11:08:40.815959 dq1 count: 600
11:08:40.819291 dq1 count: 700
11:08:40.823292 dq1 count: 800
11:08:40.827292 dq1 count: 900
11:08:40.831906 dq1 count: 1000
11:08:45.704080 dq1 count: 100000
11:08:50.647042 dq1 count: 200000
11:08:55.585346 dq1 count: 300000
11:09:00.602610 dq1 count: 400000
11:09:05.558859 dq1 count: 500000
11:09:10.636679 dq1 count: 600000
11:09:15.671303 dq1 count: 700000
11:09:20.650588 dq1 count: 800000
11:09:26.240302 dq1 count: 900000
11:09:31.531976 dq1 count: 1000000
Process finished with exit code -1
3.5 成果剖析
- 组合运用1:10W条数据增加数据时长=300ms,出行列时刻= 11s。
- 组合运用1:100W条数据增加数据时长=3.051s,出行列时刻 =2m26s。
- 组合运用2:100W条数据mq入=3s, MQ抢占时段:mq出=dq入=dq出 = 50条/s, MQ非抢占时段100W/50s。
5 综合剖析
- MQ在数据量比较大的时分,抢占行列会导致出行列速度降低。
- 不推荐组合运用,建议运用进程+MQ或都运用线程+collect.deque比较好。