Tweepy 流套接字无法发送预处理的文本



我有两个程序,它们通过套接字连接。一个是tweepy StreamListener,我还使用库"tweet-preprocessor"预处理数据。另一个程序应连接到该套接字并通过Spark结构化流分析数据。问题是,当我在发送数据之前预处理数据时,Spark 不会获得批处理。

这是流侦听器

import tweepy
import socket
import json
import preprocessor as p
CONSUMER_KEY = ""
CONSUMER_SECRET = ""
ACCESS_TOKEN = ""
ACCESS_TOKEN_SECRET = ""
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
p.set_options(p.OPT.URL, p.OPT.EMOJI, p.OPT.SMILEY)
class MyStreamListener(tweepy.StreamListener):
def __init__(self, csocket):
self.client_socket = csocket
def on_data(self, raw_data):
try:
data = json.loads(raw_data)
clean_text = p.clean(data["text"])
print(clean_text)
self.client_socket.send(clean_text.encode("utf-8"))
return True
except BaseException as e:
print("Error: " + str(e))
return True
def on_error(self, status_code):
print(status_code)
return True

skt = socket.socket()
host = "localhost"
port = 5555
skt.bind((host, port))
skt.listen()
client, address = skt.accept()
myStreamListener = MyStreamListener(csocket=client)
myStream = tweepy.Stream(auth=auth, listener=myStreamListener, )
myStream.filter(track=["Trump"], languages=["en"])

和简单的 Spark 代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, size
spark = SparkSession.builder.appName("TwitterSpark").getOrCreate()
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 5555).load()
#tweetlength = lines.select(
#        size(split(lines.value, " ")).alias("tweetlength")
#)
query = lines.writeStream.outputMode("update").format("console").start()
query.awaitTermination()

很可能clean_text末尾没有换行符(n(。与自动添加新行的print(clean_text)不同,socket.send()按原样发送clean_text.encode("utf-8")字节,您需要显式添加n

self.client_socket.send((clean_text + "n").encode("utf-8"))

由于没有n来分隔套接字数据中的行,Spark 将输入视为一条增长行,除非推文文本本身中有新行。

最新更新