我已经用Python 3编写了一个应用程序,该应用程序可以监视互联网速度,并在速度下降太低时将一条推文发送给ISP。该应用程序具有一个配置文件,可在其中配置ISP和目标速度。

该程序具有两个同时运行的线程。 SpeedTestThread使用python模块speedtest-cli以设置的时间间隔监视互联网速度。 TwitterThread使用tweepy发送包含速度测试数据的推文。

SpeedTestThread的速度低于阈值时,数据将推送到tweet_data_queue并设置全局tweetFlagTwitterThread监视tweetFlag,并在设置后从队列中检索数据并使用它生成一条推文。它们可以在我的Github存储库中找到。

speed_test.py:

import speedtest
import json
import time
import csv
import os
import threading
import queue
import tweepy
import random

config = json.load(open('config.json'))

exitFlag = 0
tweetFlag = 0
# Global queue for tweet data, shared between threads
tweet_data_queue = queue.Queue()

def main():
    error_logger = ErrorLogger(config['errorFilePath'])
    test_thread = SpeedTestThread(1, "SpeedTestThread1", error_logger)
    tweet_thread = TwitterThread(2, "TwitterThread1", error_logger)
    test_thread.start()
    tweet_thread.start()

class SpeedTestThread(threading.Thread):

    def __init__(self, thread_id, name, error_logger):
        threading.Thread.__init__(self)
        self.name = name
        self.thread_id = thread_id
        self.s = speedtest.Speedtest()
        self.targetSpeeds = config['internetSpeeds']
        self.dataLogger = ErrorLogger(config['logFilePath'])
        self.error_logger = error_logger

    def run(self):
        global exitFlag
        prevError = False
        while exitFlag == 0:
            try:
                results = self.getSpeeds()
                self.checkSpeeds(results)
                self.dataLogger.logCsv(results)
            except Exception as e:
                error = {"time": time.ctime(),
                         "error": "Unable to retrieve results",
                         "exception": e}
                self.error_logger.logError(error)
                prevError = True

            if prevError:
                self.error_logger.counter = 0
            time.sleep(config['testFreq'])

    def getSpeeds(self):
        self.s.get_best_server()
        self.s.upload()
        self.s.download()
        return self.s.results.dict()

    def checkSpeeds(self, results):
        global tweetFlag
        down = results['download']
        up = results['upload']
        ping = results['ping']
        if (down / (2**20) < self.targetSpeeds['download'] or
            up / (2**20) < self.targetSpeeds['upload'] or
                ping > self.targetSpeeds['ping']):
            print("Unnaceptable speed results:\n"
                  "Download: %s\n"
                  "Upload: %s\n"
                  "Ping: %s\n" % (down, up, ping))
            tweetFlag = 1
            tweet_data_queue.put(results)
            print("Results queued for tweet")

class TwitterThread(threading.Thread):

    def __init__(self, thread_id, name, error_logger):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.name = name
        self.error_logger = error_logger

        # Set up tweepy with twitter API authentication
        self.apiData = config['twitterAPI']
        auth = tweepy.OAuthHandler(
            self.apiData['apiKey'], self.apiData['apiSecret'])
        auth.set_access_token(self.apiData['accessToken'],
                              self.apiData['accessTokenSecret'])
        self.twitterAPI = tweepy.API(auth)

    def run(self):
        global exitFlag
        global tweetFlag
        prevError = False
        while True:
            if exitFlag == 1:
                break
            if tweetFlag == 1:
                tweet = self.getTweet()
                try:
                    self.twitterAPI.update_status(tweet)
                    print("Tweet successful")
                except Exception as e:
                    error = {"time": time.ctime(),
                             "error": "Unable to send tweet",
                             "exception": e}
                    self.error_logger.logError(error)
                    prevError = True

                if prevError:
                    self.error_logger.counter = 0

                if tweet_data_queue.qsize() == 0:
                    tweetFlag = 0

    def getTweet(self):
        data = tweet_data_queue.get()
        down = round(data['download'] / (2**20), 2)
        up = round(data['upload'] / (2**20), 2)
        content = random.choice(config['tweetContent'])
        return content.format(config['ispTwitter'], down, up)

class Logger(object):
    def __init__(self, filepath):
        self.filepath = filepath

    def logCsv(self, data):
        print("Logging ...")
        with open(self.filepath, 'a') as f:
            writer = csv.DictWriter(f, fieldnames=data.keys())
            if os.stat(self.filepath).st_size == 0:
                writer.writeheader()
            writer.writerow(data)
        print("Done -> '%s'" % self.filepath)

class ErrorLogger(Logger):
    def __init__(self, filepath):
        Logger.__init__(self, filepath)
        self.counter = 0

    def logError(self, errorData):
        global exitFlag
        if self.counter >= config['testAttempts']:
            exitFlag = 1
            errorData['error'] = "10 Failed test attempts, exiting."
            self.counter = 0
        print(errorData['error'])
        self.logCsv(errorData)


评论

有时这会有些烦人,因为当您开始流式传输时,速度测试结果将下降:考虑随时间推移记录日志结果,并且如果最后n分钟的平均值低于X,则发送推文。否则,这可能会发送大量假阳性。

稍作调整,但您可能希望对Twitter API调用应用一些速率限制。如果超过Twitter的速率限制,则程序将崩溃。如果将self.twitterAPI = tweepy.API(auth)更改为self.twitterAPI = tweepy.API(auth,wait_on_rate_limit = True,wait_on_rate_limit_notify = True),则Tweepy应该处理此问题并在每次限制时发出警告。 >
您受到了启发吗?

我确信您的ISP会喜欢这一点,并且它会非常有效/富有成效。

在带宽最低时添加流量?嗯。

#1 楼

总的来说,这是一个很好的模块。但是,这里有一些可用性问题/问题:


运行脚本时,没有简单的方法来停止它。 CTRL + C不起作用,我必须手动终止该进程。这可能是因为threading.Thread的处理方式,但是我不确定。
如果添加一个比高音扬声器更简单的处理程序要好一些,那就太好了。例如,我可能想给自己发送电子邮件,而不是立即发推文。要解决此问题,您应该从SpeedTestThread类中删除所有有关tweeting的内容,并可能定义派生EventHandler的常规TwitterThread类,该类调用需要在派生类中定义的某些方法action(self, up, down)。这样,您可以轻松地使用不同的MailThread(EventHandler)方法定义一个action,而不是发送邮件。
您正在混合使用两种可变的命名样式camelCaselower_case_with_underscores。 Python的官方样式指南PEP8建议所有变量和函数/方法都坚持使用后者。全局常量应该在UPPER_CASE中。
对于标志,您应该只使用TrueFalse,它有点容易阅读。请注意,您可以用while not exitFlag:代替while True: if exitFlag == 1: break,用if prevError代替if prevError == 1

评论


\ $ \ begingroup \ $
非常感谢!我将研究如何更好地停止脚本。我什至没有想到添加一个通用的EventHandler类,这是一个好主意!哦,是的,我刚刚意识到我确实混合了各种命名方式。从配置中检索字典密钥时,请使用camelCase,因为它们是从JSON加载的。但是我也确实注意到了exitFlag和tweetFlag。在不增加计数器的前提下,我一定忘记了在重构为单独的错误类时将其重新添加。再次感谢!
\ $ \ endgroup \ $
– Sir_Steadman
17年7月4日在11:20

\ $ \ begingroup \ $
点号2是我的最爱。
\ $ \ endgroup \ $
– NinoŠkopac
17年7月5日在8:12

\ $ \ begingroup \ $
6.几乎所有文档注释都导致一堵密集的代码墙。
\ $ \ endgroup \ $
–轨道轻赛
17年7月5日在9:43

\ $ \ begingroup \ $
@LightnessRacesinOrbit听起来很不错;)但是,在回购中添加了文档字符串和更多的空格,这还不错,这就是为什么我在这里没有提到它。
\ $ \ endgroup \ $
–地狱
17年7月5日在10:42

\ $ \ begingroup \ $
@Graipher:哦,是的,那要好一些
\ $ \ endgroup \ $
–轨道轻赛
17年7月5日在11:15

#2 楼

这更多的是对设计/算法/体系结构的审查,而不是代码的审查。还有一些其他问题未解决的重大问题。


首先,浪费网络带宽进行重复速度测试似乎是一个坏主意。它会每小时填充一次连接(默认情况下),因此如果您当时恰巧在Internet上做某事,它将对您个人造成伤害。除此之外,它还会给您的ISP的其他用户增加拥塞。并非您的ISP的错:


"tweetContent": [
    "{0}! I'm meant to get 52mb/s down, 10mb/s up. I got {1}mb/s down, {2}mb/s up!",
    "Hey {0}, think {1}mb/s down, {2}mb/s up instead of 52mb/s down, 10mb/s up is ok - it's not!",
    "Don't break your promise {0}. {1}mb/s down, {2}mb/s up != 52mb/s down, 10mb/s up",
    "{0}, how do I Netflix as expected with {1}mb/s down, {2}mb/s instead of 52mb/s down, 10mb/s up?"
]



速度测试结果可能取决于网络其他地方的拥塞或速度测试服务器。即使在进行速度测试时,您自己的下载(或Netflix观看)出现拥塞也会减少测量值。

一条合理的消息可能是:这只是事实的简单表述,并留下了测试有问题的解释。但是人们仍然会收到消息。在某些方面,中立的事实陈述比指责诺言或抱怨Netflix更容易受到重视。

您的速度测试仅衡量其自身的带宽,而不是总带宽您与ISP的连接带宽。这样做会更好,但是需要在路由器上进行某些操作来监视流量,并需要一种从该程序查询流量的方法。 (您仍然可以使用速度测试来生成流量,并检查其是否将总流量提高到了预期的吞吐量。)被设置。不睡觉而忙碌的等待会浪费大量的CPU时间(和电力)。

我并不是很了解Python,所以我对使用什么没有任何建议,但是您肯定想要某种语言支持的同步变量/标志,该变量/标志可以让您休眠直到其他线程对其进行修改。 (您可以睡10秒钟,然后再次检查变量,但是与操作系统支持的通知相比,轮询很糟糕。)

,您根本不需要多线程。

两次速度测试之间的任何合理间隔都足够长,以使推文功能可以返回(即使超时)。如果不想在两次速度测试之间的睡眠中花费时间,可以检查调用tweet函数之前/之后的时间,并从睡眠间隔中减去那么多秒。您可以像这样的伪代码实现一个队列:它在无法鸣叫时会不断重试,与您的线程版本相同。我们可以使它变得简单,因为速度测试之间的间隔与配置的秒数无关紧要。在推文发布超时中停留一分钟是可以的。


无法发布的队列中的推文值得怀疑。为延迟的发信息保存时间戳。除非我错过了,否则它们将仅在发布时显示在Twitter上,而不是测量低速的时间。而且,如果您的连接断开了一会儿,您将在重新上网时发布大量愤怒的推文(因为结果将为零)。或者,也许您处理了速度测试导致错误而不是低速的情况,但我并没有那么仔细。

评论


\ $ \ begingroup \ $
我现在同意推文字符串过于敌对。在我的脑海中,它们原本是要变得活泼而讽刺,但那当然是行不通的-我将尽快解决。在多线程方面,起初我让它更频繁地轮询,然后才意识到这是一个坏主意。
\ $ \ endgroup \ $
– Sir_Steadman
17年7月4日在14:49

\ $ \ begingroup \ $
是的。这种口号背后的意图是,AT MOST,是关于一项商业ISP合同,每月费用为100美元或1000美元,而不是netflix所需要的。然后,您将不会使用这种音调:)此外,不断地进行速度测试也可以被视为浪费资源,使您处于不利的境地:如果您实际上一直在使用所有带宽,则可以期望被告知获得商业合同或停止合同!
\ $ \ endgroup \ $
–rackandboneman
17年7月4日在15:34



\ $ \ begingroup \ $
“实际上,您根本不需要多线程。”完全同意。
\ $ \ endgroup \ $
–迈克尔
17年7月6日在9:31

#3 楼

我只想说一件事(但IMO非常重要):

您正在提早养成不良习惯!类的重点是消除(真正)global状态,并在类和对象内对其进行管理。您确实滥用了global的用法,这会使您的代码更难于理解。 。但是,在开始时应该避免这种情况,直到您了解何时合适为止。他们的实例,等等(这通常是最清楚的,因为在可行且不太重复的情况下,它使依赖关系最明显)。线程通常很复杂,并且当两个,三个(或更多)线程以相同的值工作时,您不能期望对事件发生的顺序有直观的了解。语言,编译器,OS,处理器...都可以发挥作用,并出于速度,实用性或任何其他原因而决定修改操作顺序。

正确的方法是使用Python的共享工具(锁和朋友),或者更好的方法是交流数据而不是共享数据。

评论


\ $ \ begingroup \ $
谢谢!我必须说,让我成为那些全球人士并不适合我。但是,如果没有它,我无法找出两个线程访问控制标志的方法:/两个线程同时运行,并且都需要更改标志才能更改另一个线程的行为。您是否知道在没有全局变量的情况下如何实现这一目标?再次感谢
\ $ \ endgroup \ $
– Sir_Steadman
17年7月4日在11:59

\ $ \ begingroup \ $
查看我的编辑。我希望这是有道理的:)
\ $ \ endgroup \ $
– Grajdeanu Alex
17年7月4日在13:53

#4 楼

我想您需要仔细考虑一下用例。


停机期间应该怎么办?


您要入队吗?
是否要为此添加一条特殊消息,例如“在hh:mm处检测到中断,在hh:mm处解决”(在中断后发送时进行了调整)? br />它应该发任何推文吗?


它应该精确地测量什么?如果您的伴侣在楼上玩《反恐精英》时正在观看Netflix,则在网络状况良好的情况下,您的个人速度可能会下降。
看看是否可以考虑自己的流量。
一旦速度下降到阈值以下该怎么办?
是否要以通常的时间间隔进行测量,是否要更频繁地检查?也许您想检查的次数更多,但发的鸣叫次数更少,或者检查您的ISP的回复。
就像其他用户已经提到的那样,我将更改这条推文,以表明这是自动速度测试。如果您的机器人陷入困境,至少每个人都清楚发生了什么。


评论


\ $ \ begingroup \ $
好主意。自适应行为并让脚本“知道”中断是一个很好的主意(一旦解决了首先消耗带宽的问题……)
\ $ \ endgroup \ $
– Peter Cordes
17年7月6日在17:00

\ $ \ begingroup \ $
老实说,我什至都没有想到过停机,这确实是个好主意。您知道我应该去哪里弄清楚如何监视和考虑其他流量吗?谢谢
\ $ \ endgroup \ $
– Sir_Steadman
17年7月10日在20:25