{"ip":"xxx.xxx.xxx.xxx","timestamp":"2017-05-27T04:00:35-04:00","data":{},"error":"EOF","error_component":"banner"}
{"ip":"xxx.xxx.xxx.xxx","timestamp":"2017-05-27T04:00:35-04:00","data":{"banner":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx","ehlo":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx","starttls":"500 Unknown command\r\n"},"error":"Bad return code for STARTTLS","error_component":"starttls"}
{"ip":"xxx.xxx.xxx.xxx","timestamp":"2017-05-27T04:00:35-04:00","data":{},"error":"EOF","error_component":"banner"}
{"ip":"xxx.xxx.xxx.xxx","timestamp":"2017-05-27T04:00:35-04:00","data":{"banner":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx","ehlo":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx","starttls":"502 No such command\r\n"},"error":"Bad return code for STARTTLS","error_component":"starttls"}
CIDR文件
2000行
86.80.0.0/12
77.160.0.0/12
188.200.0.0/13
178.224.0.0/13
84.24.0.0/13
脚本
import sys
import json
from netaddr import *
reload(sys)
sys.setdefaultencoding('utf-8')
filename = 'results.json'
filename = unicode(filename, 'utf-8')
cidr_filename = 'cidr.txt'
rowcount = 0
count = 0
# Load CIDR ranges
with open(cidr_filename, 'r') as f:
cidr = [line.strip() for line in f]
# Load JSON line by line
with open(filename) as f:
for line in f:
output = json.loads(line)
rowcount += 1
# Match if IP is in CIDR ranges
if all_matching_cidrs(output['ip'], cidr):
if 'banner' in output['data']:
print(output['ip'] + '\t' + output['data']['banner'])
count += 1
print('---------------------------------------')
print('LINES: {rowcount}')
print('RESULTS: {count}')
print('---------------------------------------')
当前结果
解析示例100,000行的行现在需要8分钟,使用以下命令:
< Pypy
配备2.8 GHz Intel Core i7、16Gb RAM,SSD的MacBook Pro
解析完整的2000万行数据将花费惊人的26小时。
---------------------------------------
LINES: 100000
RESULTS: 1243
---------------------------------------
real 7m57.739s
user 7m52.127s
sys 0m4.177s
瓶颈是要搜索的CIDR范围的数量,当我针对1个CIDR范围运行100,000行的示例集时,只需1.2秒。 />
---------------------------------------
LINES: 100000
RESULTS: 4
---------------------------------------
real 0m1.201s
user 0m1.095s
sys 0m0.090s
是否有更快的方法来实现?多线程/多处理会加快处理速度吗?任何帮助或其他反馈将不胜感激!
我已经完成的事情:
使用Pypy,这比Python 2.7快9x(!)
尝试使用Tim Bray的Widefinder,但由于它专注于正则表达式搜索恕我直言而无法使其工作。
UPDATE
从±26小时到4.5分钟解析20,344,457行!
---------------------------------------
LINES: 20344457
RESULTS: 130863
---------------------------------------
real 4m27.661s
user 3m55.171s
sys 0m26.793s
TemporalWolf对cProfile的建议我的代码表明,确实json.loads()是瓶颈:
ncalls tottime percall cumtime percall filename:lineno(function)
16607394 131.960 0.000 131.960 0.000 {_pypyjson.loads}
按照他的建议,本机切片IP地址,而不是将每行作为JSON加载,速度提高了2.5倍!
---------------------------------------
LINES: 20344457
RESULTS: 130863
---------------------------------------
real 1m40.548s
user 1m13.885s
sys 0m22.664s
#1 楼
是否有更快的方法来实现这一目标?多线程/多处理会加快处理速度吗?任何帮助或其他反馈将不胜感激!
不,在大多数情况下,多线程对您没有任何影响。在某些时候,瓶颈应该是读取50GB文件内容的IO速度,而不是处理速度。您还需要按顺序读取文件(我认为),以与输入相同的顺序获得输出。
但是,从根本上讲,该解决方案不必具有多线程执行即可提高性能。
学习如何衡量代码各部分的性能是一项重要技能。目前,它可能只是计时此代码一样简单:
# Load JSON line by line
with open(filename) as f:
for line in f:
output = json.loads(line)
rowcount+=1
即从JSON转换每一行,并计算行数。...这有多快?我希望当IP CIDR查找也能快速运行时,整个程序也应同样快。
几乎可以肯定,您的性能问题与此行有关:
if all_matching_cidrs(output['ip'], cidr):
您的计时已支持此操作....在1个CIRD中搜索所有记录需要1秒,但对于2000个CIDR则要更长的时间...
因此,您遇到的性能问题是\ $ O(mn)\ $的顺序,其中\ $ m \ $是文件中的行数,而\ $ n \ $是CIDR的数量。
您不能提高与文件中的行数有关的性能,但是可以提高CIDR查找的成本。如果检查所有CIDR匹配项都是固定成本怎么办?这样,您的整体性能将变为\ $ O(m)\ $,而不依赖于CIDR记录的数量。
您可以通过将CIDR数据预处理为允许固定成本查询。
我将使用的结构是由表示CIDR规范中每个位的节点组成的二叉树。每个叶节点代表一个要包括的CIDR。即您将CIDR预处理到最多具有32个级别的树中(对于
/32
CIDR)。然后,对于查找,您从JSON中获取IP,将其转换为整数,并从最高有效位开始移位。对于每一位,您都开始对CIDR树进行降级,如果可以对树进行降级,直到遇到叶节点,那么您将找到匹配的CIDR。最多,这将是树下的32次迭代,但是在大多数情况下,CIDR很少那么具体。因此,我们假设最多为
/24
CIDR,这意味着您最多可以将查询减少到24个下降,而不是多达2000个完整检查。归结为算法。
更新-示例查找
注意,我将这棵树合并在一起,以支持在多个CIDR范围内更快地查找IP。 Python不是我的主要语言,因此请仔细检查它,并根据需要进行调整。具体来说,我使用了一些幼稚的机制将IP地址解析为整数。可以使用专用的库来代替。
您可以看到它在ideone上运行:https://ideone.com/cd0O2I
def parseIPPart(ipx, shift):
try:
return int(ipx) << shift
except TypeError:
return 0
def parseIP(ipString):
ips_shifts = zip(ipString.split("."), range(24, -1, -8))
addr = [parseIPPart(ip, shift) for ip, shift in ips_shifts]
return sum(addr)
def parseCIDR(cidr):
addrString, bitsString = cidr.split('/')
try:
bits = int(bitsString)
except TypeError:
bits = 32
addr = parseIP(addrString)
return addr, bits
class CIDRTree:
class CIDRNode:
def __init__(self, depth):
self.depth = depth
self.isset = None
self.unset = None
self.leaf = False
def __init__(self):
self.root = CIDRTree.CIDRNode(-1)
def addCIDR(self, cidr):
ip, bits = parseCIDR(cidr)
node = self.root
for b in range(bits):
if node.leaf:
# Previous bigger CIDR Covers this subnet
return
mask = 1 << (31 - b)
if (ip & mask) != 0:
if node.isset is None:
node.isset = CIDRTree.CIDRNode(b)
kid = node.isset
else:
if node.unset is None:
node.unset = CIDRTree.CIDRNode(b)
kid = node.unset
node = kid
# node is now a representation of the leaf that comes from this CIDR.
# Clear out any more specific CIDRs that are no longer relevant (this CIDR includes a previous CIDR)
node.isset = None
node.unset = None
node.leaf = True
#print("Added CIDR ", ip, " and bits ", bits)
def matches(self, ipString):
ip = parseIP(ipString)
node = self.root
shift = 0
while node is not None and not node.leaf:
shift += 1
mask = 1 << (32 - shift)
val = (ip & mask) != 0
node = node.isset if val else node.unset
return node is not None and node.leaf
if __name__ == "__main__":
cidrTree = CIDRTree()
cidrTree.addCIDR("8.0.0.0/8")
cidrTree.addCIDR("9.8.7.0/24")
print ("Tree matches 8.8.8.8:", cidrTree.matches("8.8.8.8"))
print ("Tree matches 9.9.9.9:", cidrTree.matches("9.9.9.9"))
print ("Tree matches 9.8.7.6:", cidrTree.matches("9.8.7.6"))
评论
\ $ \ begingroup \ $
哇,非常感谢。它的速度非常快,现在100,000行只需1.3秒,而不是7分钟:实际0m1.350s用户0m1.174s sys 0m0.146s
\ $ \ endgroup \ $
– JMK09
17年6月14日14:56在
\ $ \ begingroup \ $
我已经在完整的20,344,457行上运行了它:真正的4m27.661s用户3m55.171s sys 0m26.793s这甚至比我最初尝试的100,000条记录还要快:)
\ $ \ endgroup \ $
– JMK09
17年6月14日在15:01
\ $ \ begingroup \ $
仅供参考,我计算了一些数字,现在每秒处理约75,000条记录,即190MB / s,对于您的系统来说,这是正确的选择。 SSD应该会更快一些,但是会有一些开销,而且我希望,如果您现在有一个并发的IO /处理系统(在一个线程中计算匹配,而在另一个线程中阻塞IO),则可以节省一些时间....但可能不值得。仍有改进的空间,但这将是艰苦的工作。
\ $ \ endgroup \ $
–rolfl
17年6月14日在19:35
\ $ \ begingroup \ $
Nitpick的解释:二叉搜索树将复杂度降低为O(log n),因此最终复杂度将为O(m log n)。
\ $ \ endgroup \ $
–临时狼
17年6月14日在20:42
\ $ \ begingroup \ $
@TemporalWolf-并非完全准确。现实情况是,日志n中的n是常数32(IP地址中的位数),而不是CIDR的数目,因此,作为常数,复杂度仅为\ $ O(m)\ $
\ $ \ endgroup \ $
–rolfl
17年6月14日在21:51
#2 楼
对于正在处理的日志和CIDR文件,我看不到使用sys.setdefaultencoding()
背后的原因。 您可能有兴趣阅读:
为什么sys.setdefaultencoding()会破坏代码
为什么我们不应该使用sys.setdefaultencoding(“ utf-8”)在py脚本中?
评论
\ $ \ begingroup \ $
解析(我相信是)日语字符时遇到一些编码问题。我删除了这两行,然后再次运行脚本,它没有任何性能改进。真正的7m17.190s用户7m14.737s sys 0m1.853s
\ $ \ endgroup \ $
– JMK09
17年6月14日在12:56
#3 楼
首先,Phyton不是我选择的语言(换句话说,我一点都不了解),所以我只是想添加一些与语言无关的内容。是的,您将从这里的多线程中受益很多,这是一个非常基本的情况。我理所当然地认为,Phyton对于使用多线程不做任何复杂的事情,显然,因为即使使用基本的OS API,执行多线程本身也是一项艰巨的任务。一般而言,除非您要处理非常快速的计算,否则线程增加的额外开销会使性能恶化,所以没有必要在所有地方都不使用并行性。这是计算机编程中最简单的事情之一,它使您可以“扼杀”硬件,以至于无论如何,您都可以按照PC允许的速度运行。
因此,这里的事情之一是CPU等待您的硬盘提供数据,然后硬盘等待CPU请求某些东西。
首先也是最明显的一点是,每次访问磁盘获取几个字节的效率都非常低:磁盘(HDD或SDD相同,在这种情况下)最适合持续读取。操作系统缓存将为您提供帮助,以便它会针对您提出的每个请求提前读取大量数据,但是您知道自己要遍历整个文件,因此您不应该依赖缓存并尝试在执行时尽可能地高效您的代码。
实际上,正如rolfl指出的那样
在某些时候,瓶颈应该是读取50GB文件内容的IO速度
对他的回答的其中一项评论表明,这正是发生的事情:
我处理了一些数字,每条处理约75,000条记录
现在每秒,或190MB / s
除非是5-8岁的廉价型号,否则190 MB / s的硬盘速度太低了。现在,即使是SD卡,有时也可以更快。从一个不错的固态硬盘,我希望至少能达到两倍的速度,今天,即使是价格在100美元左右的固态硬盘,也可以轻松使SATA接口饱和。
,也就是说,您想读取大块数据每次;无论如何,不要一次读一行。没有神奇的数字,但是除非计算机出现严重的内存问题,否则每次100 MB都应该是个好数目。
现在,问题是,在等待数据时,CPU处于空闲状态,什么也不做。然后,在CPU处理数据的同时,磁盘处于空闲状态,等待做某事:不用说这是时候进行一些免费的多线程处理了。确切的项目细节和限制,因为您可以简单地使线程数等于内核数,每个线程都在文件的相同部分上工作(一小时内即可轻松完成),或者编写一个中央文件调度程序,它读取文件的块,创建达到一定限制的线程(可能需要进行一些限制)并收集结果(这可能需要一天的时间)。这完全取决于您有足够的资金和时间来做到这一点,但是,是的,去做吧。
评论
\ $ \ begingroup \ $
默认的Python实现无法使用并行性,因为它使用了全局锁:wiki.python.org/moin/GlobalInterpreterLock
\ $ \ endgroup \ $
– D. Jurcau
17年6月17日在9:36
评论
all_matching_cidrs(output ['ip'],cidr)是做什么的?如果我看一下该方法的代码,那不是检查2000万行是否匹配2000 CIDR的有效方法。它返回所有匹配项,因此即使在第一个CIDR上匹配,它也会检查400E9次。我认为您将需要实现自己的查找
我不久前问过一个类似的问题。这是关于将字符串IP地址解析为二进制格式以用于Trie数据结构,该数据结构专门用于匹配最长前缀,这似乎是您要使用CIDR进行的操作。尝试确实可以很好地完成此任务,而rolfl在改进您的代码方面所做的工作或多或少。如果您经常执行这样的操作,那么学习数据结构可能会很有用!
50GB ... JSON ...亲爱的上帝...
如果您注意到json.loads()花费大量时间并且大多数行与cidr列表不匹配,则应使用cProfile =>对代码进行概要分析,天真地对IP地址进行切片(并且处理的长度小于最大长度) )大约需要json.loads()时间的1/20,这只需要针对您正在处理的行执行...跳过的行的成本降低了95%。那有意义吗?在进行个人资料之前,您不会知道。