我正在尝试加快读取大型日志文件(JSON行,50gb +)的Python脚本的速度,并筛选出与2000个CIDR范围中的1个匹配的结果。 > 2000万行

{"ip":"xxx.xxx.xxx.xxx","timestamp":"2017-05-27T04:00:35-04:00","data":{},"error":"EOF","error_component":"banner"}
{"ip":"xxx.xxx.xxx.xxx","timestamp":"2017-05-27T04:00:35-04:00","data":{"banner":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx","ehlo":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx","starttls":"500 Unknown command\r\n"},"error":"Bad return code for STARTTLS","error_component":"starttls"}
{"ip":"xxx.xxx.xxx.xxx","timestamp":"2017-05-27T04:00:35-04:00","data":{},"error":"EOF","error_component":"banner"}
{"ip":"xxx.xxx.xxx.xxx","timestamp":"2017-05-27T04:00:35-04:00","data":{"banner":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx","ehlo":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx","starttls":"502 No such command\r\n"},"error":"Bad return code for STARTTLS","error_component":"starttls"}


CIDR文件

2000行

86.80.0.0/12
77.160.0.0/12
188.200.0.0/13
178.224.0.0/13
84.24.0.0/13


脚本

import sys
import json
from netaddr import *

reload(sys)
sys.setdefaultencoding('utf-8')

filename = 'results.json'
filename = unicode(filename, 'utf-8')
cidr_filename = 'cidr.txt'

rowcount = 0
count = 0

# Load CIDR ranges
with open(cidr_filename, 'r') as f:
    cidr = [line.strip() for line in f]

# Load JSON line by line
with open(filename) as f:
    for line in f:
        output = json.loads(line)
        rowcount += 1

        # Match if IP is in CIDR ranges
        if all_matching_cidrs(output['ip'], cidr):
            if 'banner' in output['data']:
                print(output['ip'] + '\t' + output['data']['banner'])
                count += 1

print('---------------------------------------')
print('LINES:   {rowcount}')
print('RESULTS: {count}')
print('---------------------------------------')


当前结果

解析示例100,000行的行现在需要8分钟,使用以下命令:

< Pypy
配备2.8 GHz Intel Core i7、16Gb RAM,SSD的MacBook Pro

解析完整的2000万行数据将花费惊人的26小时。

---------------------------------------
LINES:   100000
RESULTS: 1243
---------------------------------------

real    7m57.739s
user    7m52.127s
sys     0m4.177s


瓶颈是要搜索的CIDR范围的数量,当我针对1个CIDR范围运行100,000行的示例集时,只需1.2秒。 />
---------------------------------------
LINES:   100000
RESULTS: 4
---------------------------------------

real    0m1.201s
user    0m1.095s
sys     0m0.090s


是否有更快的方法来实现?多线程/多处理会加快处理速度吗?任何帮助或其他反馈将不胜感激!

我已经完成的事情:


使用Pypy,这比Python 2.7快9x(!)
尝试使用Tim Bray的Widefinder,但由于它专注于正则表达式搜索恕我直言而无法使其工作。

UPDATE

从±26小时到4.5分钟解析20,344,457行!

---------------------------------------
LINES:   20344457
RESULTS: 130863
---------------------------------------

real    4m27.661s 
user    3m55.171s 
sys     0m26.793s


TemporalWolf对cProfile的建议我的代码表明,确实json.loads()是瓶颈:

ncalls    tottime  percall  cumtime  percall filename:lineno(function)
16607394  131.960    0.000  131.960    0.000 {_pypyjson.loads}


按照他的建议,本机切片IP地址,而不是将每行作为JSON加载,速度提高了2.5倍!

---------------------------------------
LINES:   20344457
RESULTS: 130863
---------------------------------------

real    1m40.548s
user    1m13.885s
sys     0m22.664s


评论

all_matching_cidrs(output ['ip'],cidr)是做什么的?

如果我看一下该方法的代码,那不是检查2000万行是否匹配2000 CIDR的有效方法。它返回所有匹配项,因此即使在第一个CIDR上匹配,它也会检查400E9次。我认为您将需要实现自己的查找

我不久前问过一个类似的问题。这是关于将字符串IP地址解析为二进制格式以用于Trie数据结构,该数据结构专门用于匹配最长前缀,这似乎是您要使用CIDR进行的操作。尝试确实可以很好地完成此任务,而rolfl在改进您的代码方面所做的工作或多或少。如果您经常执行这样的操作,那么学习数据结构可能会很有用!

50GB ... JSON ...亲爱的上帝...

如果您注意到json.loads()花费大量时间并且大多数行与cidr列表不匹配,则应使用cProfile =>对代码进行概要分析,天真地对IP地址进行切片(并且处理的长度小于最大长度) )大约需要json.loads()时间的1/20,这只需要针对您正在处理的行执行...跳过的行的成本降低了95%。那有意义吗?在进行个人资料之前,您不会知道。

#1 楼


是否有更快的方法来实现这一目标?多线程/多处理会加快处理速度吗?任何帮助或其他反馈将不胜感激!


不,在大多数情况下,多线程对您没有任何影响。在某些时候,瓶颈应该是读取50GB文件内容的IO速度,而不是处理速度。您还需要按顺序读取文件(我认为),以与输入相同的顺序获得输出。

但是,从根本上讲,该解决方案不必具有多线程执行即可提高性能。

学习如何衡量代码各部分的性能是一项重要技能。目前,它可能只是计时此代码一样简单:


# Load JSON line by line
with open(filename) as f:
    for line in f:
        output = json.loads(line)
        rowcount+=1



即从JSON转换每一行,并计算行数。...这有多快?我希望当IP CIDR查找也能快速运行时,整个程序也应同样快。

几乎可以肯定,您的性能问题与此行有关:


if all_matching_cidrs(output['ip'], cidr):



您的计时已支持此操作....在1个CIRD中搜索所有记录需要1秒,但对于2000个CIDR则要更长的时间...

因此,您遇到的性能问题是\ $ O(mn)\ $的顺序,其中\ $ m \ $是文件中的行数,而\ $ n \ $是CIDR的数量。

您不能提高与文件中的行数有关的性能,但是可以提高CIDR查找的成本。如果检查所有CIDR匹配项都是固定成本怎么办?这样,您的整体性能将变为\ $ O(m)\ $,而不依赖于CIDR记录的数量。

您可以通过将CIDR数据预处理为允许固定成本查询。

我将使用的结构是由表示CIDR规范中每个位的节点组成的二叉树。每个叶节点代表一个要包括的CIDR。即您将CIDR预处理到最多具有32个级别的树中(对于/32 CIDR)。

然后,对于查找,您从JSON中获取IP,将其转换为整数,并从最高有效位开始移位。对于每一位,您都开始对CIDR树进行降级,如果可以对树进行降级,直到遇到叶节点,那么您将找到匹配的CIDR。最多,这将是树下的32次迭代,但是在大多数情况下,CIDR很少那么具体。因此,我们假设最多为/24 CIDR,这意味着您最多可以将查询减少到24个下降,而不是多达2000个完整检查。

归结为算法。

更新-示例查找

注意,我将这棵树合并在一起,以支持在多个CIDR范围内更快地查找IP。 Python不是我的主要语言,因此请仔细检查它,并根据需要进行调整。具体来说,我使用了一些幼稚的机制将IP地址解析为整数。可以使用专用的库来代替。

您可以看到它在ideone上运行:https://ideone.com/cd0O2I

def parseIPPart(ipx, shift):
    try:
        return int(ipx) << shift
    except TypeError:
        return 0

def parseIP(ipString):
    ips_shifts = zip(ipString.split("."), range(24, -1, -8))

    addr = [parseIPPart(ip, shift) for ip, shift in ips_shifts]
    return sum(addr)


def parseCIDR(cidr):
    addrString, bitsString = cidr.split('/')
    try:
        bits = int(bitsString)
    except TypeError:
        bits = 32
    addr = parseIP(addrString)
    return addr, bits


class CIDRTree:
    class CIDRNode:
        def __init__(self, depth):
            self.depth = depth
            self.isset = None
            self.unset = None
            self.leaf = False

    def __init__(self):
        self.root = CIDRTree.CIDRNode(-1)

    def addCIDR(self, cidr):
        ip, bits = parseCIDR(cidr)
        node = self.root
        for b in range(bits):
            if node.leaf:
                # Previous bigger CIDR Covers this subnet
                return
            mask = 1 << (31 - b)

            if (ip & mask) != 0:
                if node.isset is None:
                    node.isset = CIDRTree.CIDRNode(b)
                kid = node.isset
            else:
                if node.unset is None:
                    node.unset = CIDRTree.CIDRNode(b)
                kid = node.unset
            node = kid
        # node is now a representation of the leaf that comes from this CIDR.
        # Clear out any more specific CIDRs that are no longer relevant (this CIDR includes a previous CIDR)
        node.isset = None
        node.unset = None
        node.leaf = True
        #print("Added CIDR ", ip, " and bits ", bits)

    def matches(self, ipString):

        ip = parseIP(ipString)
        node = self.root
        shift = 0
        while node is not None and not node.leaf:
            shift += 1
            mask = 1 << (32 - shift)
            val = (ip & mask) != 0
            node = node.isset if val else node.unset

        return node is not None and node.leaf

if __name__ == "__main__":
    cidrTree = CIDRTree()
    cidrTree.addCIDR("8.0.0.0/8")
    cidrTree.addCIDR("9.8.7.0/24")

    print ("Tree matches 8.8.8.8:", cidrTree.matches("8.8.8.8"))
    print ("Tree matches 9.9.9.9:", cidrTree.matches("9.9.9.9"))
    print ("Tree matches 9.8.7.6:", cidrTree.matches("9.8.7.6"))


评论


\ $ \ begingroup \ $
哇,非常感谢。它的速度非常快,现在100,000行只需1.3秒,而不是7分钟:实际0m1.350s用户0m1.174s sys 0m0.146s
\ $ \ endgroup \ $
– JMK09
17年6月14日14:56在

\ $ \ begingroup \ $
我已经在完整的20,344,457行上运行了它:真正的4m27.661s用户3m55.171s sys 0m26.793s这甚至比我最初尝试的100,000条记录还要快:)
\ $ \ endgroup \ $
– JMK09
17年6月14日在15:01

\ $ \ begingroup \ $
仅供参考,我计算了一些数字,现在每秒处理约75,000条记录,即190MB / s,对于您的系统来说,这是正确的选择。 SSD应该会更快一些,但是会有一些开销,而且我希望,如果您现在有一个并发的IO /处理系统(在一个线程中计算匹配,而在另一个线程中阻塞IO),则可以节省一些时间....但可能不值得。仍有改进的空间,但这将是艰苦的工作。
\ $ \ endgroup \ $
–rolfl
17年6月14日在19:35

\ $ \ begingroup \ $
Nitpick的解释:二叉搜索树将复杂度降低为O(log n),因此最终复杂度将为O(m log n)。
\ $ \ endgroup \ $
–临时狼
17年6月14日在20:42

\ $ \ begingroup \ $
@TemporalWolf-并非完全准确。现实情况是,日志n中的n是常数32(IP地址中的位数),而不是CIDR的数目,因此,作为常数,复杂度仅为\ $ O(m)\ $
\ $ \ endgroup \ $
–rolfl
17年6月14日在21:51

#2 楼

对于正在处理的日志和CIDR文件,我看不到使用sys.setdefaultencoding()背后的原因。

您可能有兴趣阅读:


为什么sys.setdefaultencoding()会破坏代码
为什么我们不应该使用sys.setdefaultencoding(“ utf-8”)在py脚本中?


评论


\ $ \ begingroup \ $
解析(我相信是)日语字符时遇到一些编码问题。我删除了这两行,然后再次运行脚本,它没有任何性能改进。真正的7m17.190s用户7m14.737s sys 0m1.853s
\ $ \ endgroup \ $
– JMK09
17年6月14日在12:56



#3 楼

首先,Phyton不是我选择的语言(换句话说,我一点都不了解),所以我只是想添加一些与语言无关的内容。

是的,您将从这里的多线程中受益很多,这是一个非常基本的情况。我理所当然地认为,Phyton对于使用多线程不做任何复杂的事情,显然,因为即使使用基本的OS API,执行多线程本身也是一项艰巨的任务。一般而言,除非您要处理非常快速的计算,否则线程增加的额外开销会使性能恶化,所以没有必要在所有地方都不使用并行性。这是计算机编程中最简单的事情之一,它使您可以“扼杀”硬件,以至于无论如何,您都可以按照PC允许的速度运行。


因此,这里的事情之一是CPU等待您的硬盘提供数据,然后硬盘等待CPU请求某些东西。

首先也是最明显的一点是,每次访问磁盘获取几个字节的效率都非常低:磁盘(HDD或SDD相同,在这种情况下)最适合持续读取。操作系统缓存将为您提供帮助,以便它会针对您提出的每个请求提前读取大量数据,但是您知道自己要遍历整个文件,因此您不应该依赖缓存并尝试在执行时尽可能地高效您的代码。

实际上,正如rolfl指出的那样


在某些时候,瓶颈应该是读取50GB文件内容的IO速度


对他的回答的其中一项评论表明,这正是发生的事情:


我处理了一些数字,每条处理约75,000条记录
现在每秒,或190MB / s


除非是5-8岁的廉价型号,否则190 MB / s的硬盘速度太低了。现在,即使是SD卡,有时也可以更快。从一个不错的固态硬盘,我希望至少能达到两倍的速度,今天,即使是价格在100美元左右的固态硬盘,也可以轻松使SATA接口饱和。

,也就是说,您想读取大块数据每次;无论如何,不​​要一次读一行。没有神奇的数字,但是除非计算机出现严重的内存问题,否则每次100 MB都应该是个好数目。

现在,问题是,在等待数据时,CPU处于空闲状态,什么也不做。然后,在CPU处理数据的同时,磁盘处于空闲状态,等待做某事:不用说这是时候进行一些免费的多线程处理了。确切的项目细节和限制,因为您可以简单地使线程数等于内核数,每个线程都在文件的相同部分上工作(一小时内即可轻松完成),或者编写一个中央文件调度程序,它读取文件的块,创建达到一定限制的线程(可能需要进行一些限制)并收集结果(这可能需要一天的时间)。这完全取决于您有足够的资金和时间来做到这一点,但是,是的,去做吧。

评论


\ $ \ begingroup \ $
默认的Python实现无法使用并行性,因为它使用了全局锁:wiki.python.org/moin/GlobalInterpreterLock
\ $ \ endgroup \ $
– D. Jurcau
17年6月17日在9:36