如何从 StreamListener 处理程序 (twitter) 向电报发送消息?Tweepy & Telethon



在启动脚本后,我只发送了一条消息,之后它将挂起,不再接收来自Twitter的消息如果我去掉我包装在"0"中的代码块------------------------------"然后我会收到所有的推文,但当我试图将其发送到Telegram时,它在第一次后停止

最初没有单独的线程,因为我无法实现结果将所有内容包装在单独的线程中,但结果是相同的

我做错了什么?

from telethon import TelegramClient, events, sync
from telethon.tl.types import InputChannel
import tweepy
import yaml
import sys
import coloredlogs, logging
import asyncio
import threading
import concurrent.futures
import time
start_twitter = threading.Event()
forwardinput_channel_entities = []
forwardoutput_channels = {}
class MyStreamListener(tweepy.StreamListener):
def on_status(self, status):
user_id = status.user.id
if user_id in forwardoutput_channels:
for output_channel in forwardoutput_channels[user_id]:
message = status.text
logging.info('-------------')
logging.info(message)
# ------------------------------
try:
loop = asyncio.get_event_loop()
except Exception as e:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logging.error(e)
pass
loop.run_until_complete(telegram_client.send_message(
output_channel['channel'], message))
# ------------------------------
def twitter_thred():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
auth = tweepy.OAuthHandler(config['twitter_consumer_api'],
config['twitter_consumer_secret'])
auth.set_access_token(config['twitter_user_api'],
config['twitter_user_secret'])
global twitter_api
twitter_api = tweepy.API(auth)
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=twitter_api.auth, listener=myStreamListener)
start_twitter.wait()
myStream.filter(follow=forwardinput_channel_entities,
is_async=True)
def telegram_thred():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
global telegram_client
telegram_client = TelegramClient(config['session_name'],
config['api_id'],
config['api_hash'])
telegram_client.start()
for forwardto in config['forwardto_list_ids']:
for twitter_user_id in forwardto['from']:
forwardinput_channel_entities.append(str(twitter_user_id))
channels = []
for channel in telegram_client.iter_dialogs():
if channel.entity.id in forwardto['to']:
channels.append({
'channel': InputChannel(
channel.entity.id, channel.entity.access_hash),
})
forwardoutput_channels[twitter_user_id] = channels
start_twitter.set()
telegram_client.run_until_disconnected()
def start():
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(telegram_thred)
future = executor.submit(twitter_thred)
if __name__ == '__main__':
if len(sys.argv) < 2:
print(f'Usage: {sys.argv[0]} {{CONFIG_PATH}}')
sys.exit(1)
with open(sys.argv[1], 'rb') as f:
global config
config = yaml.safe_load(f)
coloredlogs.install(
fmt='%(asctime)s.%(msecs)03d %(message)s',
datefmt='%H:%M:%S')

start()

运行脚本的yml配置示例:

# telegram
api_id: *****************
api_hash: '*****************'
session_name: 'test'
# twitter
twitter_consumer_api: '*****************'
twitter_consumer_secret: '*****************'
twitter_user_api: '*****************'
twitter_user_secret: '*****************'
forwardto_list_ids:
- from:
- 0000000000    # account twitter id
to:
- 0000000000    # telegram channel id

如前所述,Tweepy还不支持异步流,因此在运行流时会阻塞事件循环。is_async使用线程方法。

现在,您应该考虑使用Tweepy的异步流分支/https://github.com/tweepy/tweepy/pull/1491.

最新更新