为什么我会收到属性错误:'NoneType'对象没有属性'send'以及如何避免它?



运行scrapeBackend.py时出现以下错误

AttributeError: 'NoneType' object has no attribute 'send'

我试图使用从customKafka.py中的ProducerConsumer类继承的send_topic方法

import time
import pymongo
from pymongo import MongoClient
from customKafka import ProducerConsumer

class scrapeBackend(ProducerConsumer):
def run(self):
ARTICLE_SCRAPED_KAFKA_TOPIC = "raw_text"
ARTICLE_LIMIT = 20
print("Generating raw article")
print("generating every 3 seconds")
for i in range(1,ARTICLE_LIMIT):
data={
"article_id": i,
"source" : "bbc",
"article" : "this is an article"
}
super().send_topic("raw_text",data)
#collection.insert_one(data)
print(f"done sending {i}")
time.sleep(3)
scrap1=scrapeBackend(True,False,"raw_text","localhost:29092")
scrap1.run()

customKafka.py

from kafka import KafkaConsumer
from kafka import KafkaProducer
import json
class ProducerConsumer:
__isProducer=None
__isConsumer=None
__producer=None
__consumer=None
def __init__(self,isProducer,isConsumer,topic,bootstrap_servers):
if (self.__isProducer):
self.__producer=KafkaProducer(bootstrap_servers=bootstrap_servers)
if (self.__isConsumer):
self.__consumer= KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms = 10000)
def get_producer(self):
return self.__producer
def get_consumer(self):
return self.__consumer
def send_topic(self,topic,data):
self.get_producer().send(topic, json.dumps(data).encode("utf-8"))

看起来像getProducer()返回None,但是它应该返回一些东西,因为我在运行

之前进行了初始化

问题在于父类的__init__函数。

应该这样实现:

def __init__(self,isProducer,isConsumer,topic,bootstrap_servers):
if isProducer:
self.__producer=KafkaProducer(bootstrap_servers=bootstrap_servers)
if isConsumer:
self.__consumer= KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms = 10000)

注意if语句中的区别。希望这对你有帮助!

你应该在子类的__init__中调用super(),然后你应该能够从对象scrap1中调用函数send_topic

class scrapeBackend(ProducerConsumer):
def __init__(self, *args, **kwargs):
super(scrapeBackend, self).__init__(*args, **kwargs)
def run(self):
ARTICLE_SCRAPED_KAFKA_TOPIC = "raw_text"
ARTICLE_LIMIT = 20
print("Generating raw article")
print("generating every 3 seconds")
for i in range(1,ARTICLE_LIMIT):
data={
"article_id": i,
"source" : "bbc",
"article" : "this is an article"
}
self.send_topic("raw_text", data)
#collection.insert_one(data)
print(f"done sending {i}")
time.sleep(3)

ProductConsumer__init__不能处理输入参数(两个if条件),它应该这样实现:

class ProducerConsumer:
def __init__(self,isProducer,isConsumer,topic,bootstrap_servers):
if isProducer:
self.__producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
if isConsumer:
self.__consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms = 10000)
def get_producer(self):
return self.__producer
def get_consumer(self):
return self.__consumer
def send_topic(self,topic,data):
self.get_producer().send(topic, json.dumps(data).encode("utf-8"))