autobahn.asyncio.wamp.ApplicationSession
。以前,我是通过修改高速公路库来实现此目的的,正如本文答案中所建议的那样。我现在需要更专业的解决方案。
经过一段时间的搜索,这篇文章看起来很有希望,但是使用了
twisted
库而不是asyncio
。我无法为asyncio
库的autobahn
分支找到类似的解决方案,因为它似乎没有使用Reactors
。我遇到的主要问题是ApplicationRunner.run()
正在阻塞(这就是为什么我以前将它外包给一个线程的原因),所以我不能在它之后再运行第二个ApplicationRunner
。 我确实需要同时访问2个websocket通道,这似乎与单个
ApplicationSession
无关。到目前为止,我的代码:
from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio import coroutine
import time
channel1 = 'BTC_LTC'
channel2 = 'BTC_XMR'
class LTCComponent(ApplicationSession):
def onConnect(self):
self.join(self.config.realm)
@coroutine
def onJoin(self, details):
def onTicker(*args, **kwargs):
print('LTCComponent', args, kwargs)
try:
yield from self.subscribe(onTicker, channel1)
except Exception as e:
print("Could not subscribe to topic:", e)
class XMRComponent(ApplicationSession):
def onConnect(self):
self.join(self.config.realm)
@coroutine
def onJoin(self, details):
def onTicker(*args, **kwargs):
print('XMRComponent', args, kwargs)
try:
yield from self.subscribe(onTicker, channel2)
except Exception as e:
print("Could not subscribe to topic:", e)
def main():
runner = ApplicationRunner("wss://api.poloniex.com:443", "realm1", extra={})
runner.run(LTCComponent)
runner.run(XMRComponent) # <- is not being called
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
quit()
except Exception as e:
print(time.time(), e)
我对
autobahn
库的了解有限,而且恐怕文档没有太大改善。我在这里俯瞰什么吗?一个函数,一个参数,使我可以复合我的组件或一次运行它们?也许这里提供了类似的解决方案,它实现了一个替代的
ApplicationRunner
?相关主题
在扭曲状态下运行两个ApplicationSessions
在线程中运行Autobahn ApplicationRunner
Autobahn.wamp.ApplicationSession源
/>
Autobahn.wamp.Applicationrunner源
根据要求,使用
multithreading
代码@stovfl的答案进行回溯:Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/nils/anaconda3/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/home/nils/git/tools/gemini_wss/t2.py", line 27, in run
self.appRunner.run(self.__ApplicationSession)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn- 0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 143, in run
transport_factory = WampWebSocketClientFactory(create, url=self.url, serializers=self.serializers)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn- 0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line 319, in __init__
WebSocketClientFactory.__init__(self, *args, **kwargs)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn- 0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line 268, in __init__
self.loop = loop or asyncio.get_event_loop()
File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py", line 626, in get_event_loop
return get_event_loop_policy().get_event_loop()
File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py", line 572, in get_event_loop
% threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-2'.
Exception in thread Thread-1:
**Same as in Thread-2**
...
RuntimeError: There is no current event loop in thread 'Thread-1'.
#1 楼
按照您链接的方法,我设法通过asyncio设置start_loop = Falseimport asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
runner1 = ApplicationRunner(url, realm, extra={'cli_id': 1})
coro1 = runner1.run(MyApplicationSession, start_loop=False)
runner2 = ApplicationRunner(url, realm, extra={'cli_id': 2})
coro2 = runner2.run(MyApplicationSession, start_loop=False)
asyncio.get_event_loop().run_until_complete(coro1)
asyncio.get_event_loop().run_until_complete(coro2)
asyncio.get_event_loop().run_forever()
class MyApplicationSession(ApplicationSession):
def __init__(self, cfg):
super().__init__(cfg)
self.cli_id = cfg.extra['cli_id']
def onJoin(self, details):
print("session attached", self.cli_id)
获得了相同的行为
#2 楼
正如我从traceback
所看到的,我们仅达到了步骤2的4 从asyncio docs:
该模块提供了使用协程,多路复用编写单线程并发代码的基础结构通过套接字和其他资源的I / O访问
因此我使用
multithreading
放弃了我的第一个建议。我可以想象以下三个选项:
用
multiprocessing
代替multithreading
用
coroutine
内部的asyncio loop
做在
channels
中的def onJoin(self, details)
之间切换第二个建议,第一个选择使用
multiprocessing
。我可以启动两个
asyncio loops
,所以appRunner.run(...)
应该可以工作。需要传递不同的class ApplicationSession
将其添加到channel
class __ApplicationSession(ApplicationSession):
# ...
try:
yield from self.subscribe(onTicker, self.config.extra['channel'])
except Exception as e:
# ...
import multiprocessing as mp
import time
def ApplicationRunner_process(realm, channel):
appRunner = ApplicationRunner("wss://api.poloniex.com:443", realm, extra={'channel': channel})
appRunner.run(__ApplicationSession)
if __name__ == "__main__":
AppRun = [{'process':None, 'channel':'BTC_LTC'},
{'process': None, 'channel': 'BTC_XMR'}]
for app in AppRun:
app['process'] = mp.Process(target = ApplicationRunner_process, args = ('realm1', app['channel'] ))
app['process'].start()
time.sleep(0.1)
AppRun[0]['process'].join()
AppRun[1]['process'].join()
评论
嗯,你是对的-那句话没有多大意义。 run()方法不会阻止。我指的是覆盖信号调用并在线程中启动单独的事件循环的解决方案。编辑时可能迷路了。
– Deepbrook
17年3月15日在7:43
不幸的是,由于线程中没有当前事件循环,这仍然引发RuntimeError。
– Deepbrook
17年3月15日在7:54
我们非常接近-代码可以运行,但是当我尝试连接到第二个通道时,似乎出现了错误消息。 2017-03-16T08:07:33 wamp.error.not_authorized:内部错误:尚未实现的领域自动激活是对此的响应。这是代码问题还是API问题?谷歌搜索没有产生有意义的结果。
– Deepbrook
17年3月16日在7:10
更新了我的答案以涵盖此内容。请评论您使用的是一类ApplicationSession还是其他类?
– stovfl
17年3月16日在9:15
只是ApplicationSession的一个类定义。使用extras属性可以正常工作。
– Deepbrook
17年3月16日在9:37