该程序具有两个同时运行的线程。
SpeedTestThread
使用python模块speedtest-cli以设置的时间间隔监视互联网速度。 TwitterThread
使用tweepy发送包含速度测试数据的推文。 当
SpeedTestThread
的速度低于阈值时,数据将推送到tweet_data_queue并设置全局tweetFlag
。 TwitterThread
监视tweetFlag
,并在设置后从队列中检索数据并使用它生成一条推文。它们可以在我的Github存储库中找到。speed_test.py:
import speedtest
import json
import time
import csv
import os
import threading
import queue
import tweepy
import random
config = json.load(open('config.json'))
exitFlag = 0
tweetFlag = 0
# Global queue for tweet data, shared between threads
tweet_data_queue = queue.Queue()
def main():
error_logger = ErrorLogger(config['errorFilePath'])
test_thread = SpeedTestThread(1, "SpeedTestThread1", error_logger)
tweet_thread = TwitterThread(2, "TwitterThread1", error_logger)
test_thread.start()
tweet_thread.start()
class SpeedTestThread(threading.Thread):
def __init__(self, thread_id, name, error_logger):
threading.Thread.__init__(self)
self.name = name
self.thread_id = thread_id
self.s = speedtest.Speedtest()
self.targetSpeeds = config['internetSpeeds']
self.dataLogger = ErrorLogger(config['logFilePath'])
self.error_logger = error_logger
def run(self):
global exitFlag
prevError = False
while exitFlag == 0:
try:
results = self.getSpeeds()
self.checkSpeeds(results)
self.dataLogger.logCsv(results)
except Exception as e:
error = {"time": time.ctime(),
"error": "Unable to retrieve results",
"exception": e}
self.error_logger.logError(error)
prevError = True
if prevError:
self.error_logger.counter = 0
time.sleep(config['testFreq'])
def getSpeeds(self):
self.s.get_best_server()
self.s.upload()
self.s.download()
return self.s.results.dict()
def checkSpeeds(self, results):
global tweetFlag
down = results['download']
up = results['upload']
ping = results['ping']
if (down / (2**20) < self.targetSpeeds['download'] or
up / (2**20) < self.targetSpeeds['upload'] or
ping > self.targetSpeeds['ping']):
print("Unnaceptable speed results:\n"
"Download: %s\n"
"Upload: %s\n"
"Ping: %s\n" % (down, up, ping))
tweetFlag = 1
tweet_data_queue.put(results)
print("Results queued for tweet")
class TwitterThread(threading.Thread):
def __init__(self, thread_id, name, error_logger):
threading.Thread.__init__(self)
self.thread_id = thread_id
self.name = name
self.error_logger = error_logger
# Set up tweepy with twitter API authentication
self.apiData = config['twitterAPI']
auth = tweepy.OAuthHandler(
self.apiData['apiKey'], self.apiData['apiSecret'])
auth.set_access_token(self.apiData['accessToken'],
self.apiData['accessTokenSecret'])
self.twitterAPI = tweepy.API(auth)
def run(self):
global exitFlag
global tweetFlag
prevError = False
while True:
if exitFlag == 1:
break
if tweetFlag == 1:
tweet = self.getTweet()
try:
self.twitterAPI.update_status(tweet)
print("Tweet successful")
except Exception as e:
error = {"time": time.ctime(),
"error": "Unable to send tweet",
"exception": e}
self.error_logger.logError(error)
prevError = True
if prevError:
self.error_logger.counter = 0
if tweet_data_queue.qsize() == 0:
tweetFlag = 0
def getTweet(self):
data = tweet_data_queue.get()
down = round(data['download'] / (2**20), 2)
up = round(data['upload'] / (2**20), 2)
content = random.choice(config['tweetContent'])
return content.format(config['ispTwitter'], down, up)
class Logger(object):
def __init__(self, filepath):
self.filepath = filepath
def logCsv(self, data):
print("Logging ...")
with open(self.filepath, 'a') as f:
writer = csv.DictWriter(f, fieldnames=data.keys())
if os.stat(self.filepath).st_size == 0:
writer.writeheader()
writer.writerow(data)
print("Done -> '%s'" % self.filepath)
class ErrorLogger(Logger):
def __init__(self, filepath):
Logger.__init__(self, filepath)
self.counter = 0
def logError(self, errorData):
global exitFlag
if self.counter >= config['testAttempts']:
exitFlag = 1
errorData['error'] = "10 Failed test attempts, exiting."
self.counter = 0
print(errorData['error'])
self.logCsv(errorData)
#1 楼
总的来说,这是一个很好的模块。但是,这里有一些可用性问题/问题:运行脚本时,没有简单的方法来停止它。 CTRL + C不起作用,我必须手动终止该进程。这可能是因为
threading.Thread
的处理方式,但是我不确定。如果添加一个比高音扬声器更简单的处理程序要好一些,那就太好了。例如,我可能想给自己发送电子邮件,而不是立即发推文。要解决此问题,您应该从
SpeedTestThread
类中删除所有有关tweeting的内容,并可能定义派生EventHandler
的常规TwitterThread
类,该类调用需要在派生类中定义的某些方法action(self, up, down)
。这样,您可以轻松地使用不同的MailThread(EventHandler)
方法定义一个action
,而不是发送邮件。您正在混合使用两种可变的命名样式
camelCase
和lower_case_with_underscores
。 Python的官方样式指南PEP8建议所有变量和函数/方法都坚持使用后者。全局常量应该在UPPER_CASE
中。对于标志,您应该只使用
True
和False
,它有点容易阅读。请注意,您可以用while not exitFlag:
代替while True: if exitFlag == 1: break
,用if prevError
代替if prevError == 1
。评论
\ $ \ begingroup \ $
非常感谢!我将研究如何更好地停止脚本。我什至没有想到添加一个通用的EventHandler类,这是一个好主意!哦,是的,我刚刚意识到我确实混合了各种命名方式。从配置中检索字典密钥时,请使用camelCase,因为它们是从JSON加载的。但是我也确实注意到了exitFlag和tweetFlag。在不增加计数器的前提下,我一定忘记了在重构为单独的错误类时将其重新添加。再次感谢!
\ $ \ endgroup \ $
– Sir_Steadman
17年7月4日在11:20
\ $ \ begingroup \ $
点号2是我的最爱。
\ $ \ endgroup \ $
– NinoŠkopac
17年7月5日在8:12
\ $ \ begingroup \ $
6.几乎所有文档注释都导致一堵密集的代码墙。
\ $ \ endgroup \ $
–轨道轻赛
17年7月5日在9:43
\ $ \ begingroup \ $
@LightnessRacesinOrbit听起来很不错;)但是,在回购中添加了文档字符串和更多的空格,这还不错,这就是为什么我在这里没有提到它。
\ $ \ endgroup \ $
–地狱
17年7月5日在10:42
\ $ \ begingroup \ $
@Graipher:哦,是的,那要好一些
\ $ \ endgroup \ $
–轨道轻赛
17年7月5日在11:15
#2 楼
这更多的是对设计/算法/体系结构的审查,而不是代码的审查。还有一些其他问题未解决的重大问题。首先,浪费网络带宽进行重复速度测试似乎是一个坏主意。它会每小时填充一次连接(默认情况下),因此如果您当时恰巧在Internet上做某事,它将对您个人造成伤害。除此之外,它还会给您的ISP的其他用户增加拥塞。并非您的ISP的错:
"tweetContent": [
"{0}! I'm meant to get 52mb/s down, 10mb/s up. I got {1}mb/s down, {2}mb/s up!",
"Hey {0}, think {1}mb/s down, {2}mb/s up instead of 52mb/s down, 10mb/s up is ok - it's not!",
"Don't break your promise {0}. {1}mb/s down, {2}mb/s up != 52mb/s down, 10mb/s up",
"{0}, how do I Netflix as expected with {1}mb/s down, {2}mb/s instead of 52mb/s down, 10mb/s up?"
]
速度测试结果可能取决于网络其他地方的拥塞或速度测试服务器。即使在进行速度测试时,您自己的下载(或Netflix观看)出现拥塞也会减少测量值。
一条合理的消息可能是:这只是事实的简单表述,并留下了测试有问题的解释。但是人们仍然会收到消息。在某些方面,中立的事实陈述比指责诺言或抱怨Netflix更容易受到重视。
您的速度测试仅衡量其自身的带宽,而不是总带宽您与ISP的连接带宽。这样做会更好,但是需要在路由器上进行某些操作来监视流量,并需要一种从该程序查询流量的方法。 (您仍然可以使用速度测试来生成流量,并检查其是否将总流量提高到了预期的吞吐量。)被设置。不睡觉而忙碌的等待会浪费大量的CPU时间(和电力)。
我并不是很了解Python,所以我对使用什么没有任何建议,但是您肯定想要某种语言支持的同步变量/标志,该变量/标志可以让您休眠直到其他线程对其进行修改。 (您可以睡10秒钟,然后再次检查变量,但是与操作系统支持的通知相比,轮询很糟糕。)
,您根本不需要多线程。
两次速度测试之间的任何合理间隔都足够长,以使推文功能可以返回(即使超时)。如果不想在两次速度测试之间的睡眠中花费时间,可以检查调用tweet函数之前/之后的时间,并从睡眠间隔中减去那么多秒。您可以像这样的伪代码实现一个队列:它在无法鸣叫时会不断重试,与您的线程版本相同。我们可以使它变得简单,因为速度测试之间的间隔与配置的秒数无关紧要。在推文发布超时中停留一分钟是可以的。
无法发布的队列中的推文值得怀疑。为延迟的发信息保存时间戳。除非我错过了,否则它们将仅在发布时显示在Twitter上,而不是测量低速的时间。而且,如果您的连接断开了一会儿,您将在重新上网时发布大量愤怒的推文(因为结果将为零)。或者,也许您处理了速度测试导致错误而不是低速的情况,但我并没有那么仔细。
评论
\ $ \ begingroup \ $
我现在同意推文字符串过于敌对。在我的脑海中,它们原本是要变得活泼而讽刺,但那当然是行不通的-我将尽快解决。在多线程方面,起初我让它更频繁地轮询,然后才意识到这是一个坏主意。
\ $ \ endgroup \ $
– Sir_Steadman
17年7月4日在14:49
\ $ \ begingroup \ $
是的。这种口号背后的意图是,AT MOST,是关于一项商业ISP合同,每月费用为100美元或1000美元,而不是netflix所需要的。然后,您将不会使用这种音调:)此外,不断地进行速度测试也可以被视为浪费资源,使您处于不利的境地:如果您实际上一直在使用所有带宽,则可以期望被告知获得商业合同或停止合同!
\ $ \ endgroup \ $
–rackandboneman
17年7月4日在15:34
\ $ \ begingroup \ $
“实际上,您根本不需要多线程。”完全同意。
\ $ \ endgroup \ $
–迈克尔
17年7月6日在9:31
#3 楼
我只想说一件事(但IMO非常重要):您正在提早养成不良习惯!类的重点是消除(真正)
global
状态,并在类和对象内对其进行管理。您确实滥用了global
的用法,这会使您的代码更难于理解。 。但是,在开始时应该避免这种情况,直到您了解何时合适为止。他们的实例,等等(这通常是最清楚的,因为在可行且不太重复的情况下,它使依赖关系最明显)。线程通常很复杂,并且当两个,三个(或更多)线程以相同的值工作时,您不能期望对事件发生的顺序有直观的了解。语言,编译器,OS,处理器...都可以发挥作用,并出于速度,实用性或任何其他原因而决定修改操作顺序。正确的方法是使用Python的共享工具(锁和朋友),或者更好的方法是交流数据而不是共享数据。
评论
\ $ \ begingroup \ $
谢谢!我必须说,让我成为那些全球人士并不适合我。但是,如果没有它,我无法找出两个线程访问控制标志的方法:/两个线程同时运行,并且都需要更改标志才能更改另一个线程的行为。您是否知道在没有全局变量的情况下如何实现这一目标?再次感谢
\ $ \ endgroup \ $
– Sir_Steadman
17年7月4日在11:59
\ $ \ begingroup \ $
查看我的编辑。我希望这是有道理的:)
\ $ \ endgroup \ $
– Grajdeanu Alex
17年7月4日在13:53
#4 楼
我想您需要仔细考虑一下用例。停机期间应该怎么办?
您要入队吗?
是否要为此添加一条特殊消息,例如“在
hh:mm
处检测到中断,在hh:mm
处解决”(在中断后发送时进行了调整)? br />它应该发任何推文吗?它应该精确地测量什么?如果您的伴侣在楼上玩《反恐精英》时正在观看Netflix,则在网络状况良好的情况下,您的个人速度可能会下降。
看看是否可以考虑自己的流量。
一旦速度下降到阈值以下该怎么办?
是否要以通常的时间间隔进行测量,是否要更频繁地检查?也许您想检查的次数更多,但发的鸣叫次数更少,或者检查您的ISP的回复。
就像其他用户已经提到的那样,我将更改这条推文,以表明这是自动速度测试。如果您的机器人陷入困境,至少每个人都清楚发生了什么。
评论
\ $ \ begingroup \ $
好主意。自适应行为并让脚本“知道”中断是一个很好的主意(一旦解决了首先消耗带宽的问题……)
\ $ \ endgroup \ $
– Peter Cordes
17年7月6日在17:00
\ $ \ begingroup \ $
老实说,我什至都没有想到过停机,这确实是个好主意。您知道我应该去哪里弄清楚如何监视和考虑其他流量吗?谢谢
\ $ \ endgroup \ $
– Sir_Steadman
17年7月10日在20:25
评论
有时这会有些烦人,因为当您开始流式传输时,速度测试结果将下降:考虑随时间推移记录日志结果,并且如果最后n分钟的平均值低于X,则发送推文。否则,这可能会发送大量假阳性。稍作调整,但您可能希望对Twitter API调用应用一些速率限制。如果超过Twitter的速率限制,则程序将崩溃。如果将self.twitterAPI = tweepy.API(auth)更改为self.twitterAPI = tweepy.API(auth,wait_on_rate_limit = True,wait_on_rate_limit_notify = True),则Tweepy应该处理此问题并在每次限制时发出警告。 >
您受到了启发吗?
我确信您的ISP会喜欢这一点,并且它会非常有效/富有成效。
在带宽最低时添加流量?嗯。