上传 Youtube 视频 API 时收到通知



我有一个不和谐机器人,每当某个用户上传视频时,我想发送一条消息。我看过Youtube API(https://developers.google.com/youtube/v3/docs/videos),但还没有找到如何做我想做的事情。

好吧..我每 1 分钟检查一次视频,并检查视频链接(或 id)是否不等于最后一个视频,然后将视频发布到您要发布到的特定频道中。 我使用google-api-python-client首先pip install google-api-python-client

from from googleapiclient.discovery import build

在on_ready函数中

@client.event
async def on_ready():
youtube=build('youtube','v3',developerKey='Enter your key here')
req=youtube.playlistItems().list(
playlistId='The Playlist id of the channel you want to post video i.e. the id of video playlist of the channel',
part='snippet',
maxResults=1
)
res=req.execute()
vedioid=res['items'][0]['snippet']['resourceId']['videoId']
link="https://www.youtube.com/watch?v="+vedioid
ch=await client.fetch_channel(the  channel id from where im checking for the new video)
await ch.send(link)

yt.start()#Starting tasks loop which is made below for checking every minute if the new video is equal or unequal to old video link

使任务循环检查视频

@tasks.loop(seconds=60)
async def yt():      
youtube=build('youtube','v3',developerKey='Enter your key here')
req=youtube.playlistItems().list(
playlistId='The Playlist id of the channel you want to post video i.e. the id of video playlist of the channel',
part='snippet',
maxResults=1
)
res=req.execute()
vedioid=res['items'][0]['snippet']['resourceId']['videoId']
link="https://www.youtube.com/watch?v="+vedioid
ch=await client.fetch_channel(Fetching same channel from which you are checking for the video link)
async for message in ch.history(limit=1):#looping through the channel to get  the latest message i can do this using last message also but I prefer using channel.history        
if str(link) != str(message.content):
ch2=await client.fetch_channel(the channel you want to post video to)

await ch2.send(f'@everyone,**User** just posted a vedio!Go and check it out!n{link}')

await ch.send(link2)#this is important as after posting the video the link must also be posted to the check channel so that the bot do not send other link
else:
pass

所以基本上我所做的是使用私人频道在机器人准备好后立即发布它的最新视频,因为如果机器人偶然在两者之间离线然后上线,它会发布指向该频道的链接,然后我制作为任务循环,其中我每分钟检查一次,如果该 youtube 频道的最新视频链接不等于视频链接,这意味着上传者已上传视频,因此请将视频发布到我想发布的频道中。如果它相等,则什么都不做,即pass如果您正在使用而不是像我使用频道那样检查视频,则可以使用 json 文件或数据库。它工作正常。

Youtube为此提供了一个适当的webhook api,这样你就不需要使用轮询了。 您可以在此处阅读有关它的更多信息。

您将需要一个网络服务器来接收他们的身份验证获取请求,并在发布视频时发送以下帖子请求。 此外,您需要在网络服务器运行后向他们的服务器发送 get 请求以订阅。

显示整个代码,包括如何将其连接到机器人,对于这个答案来说太多了。可以看到完整的代码,包括此 github 存储库中的机器人。 或者,我为这个问题制作了一个 youtube 教程,您可以在此处查看。

下面是 Web 服务器的代码和 python 中的 get 请求:

# Function to start the web server async
async def web_server(self):
# This is required to use decorators for routes
routes = web.RouteTableDef()
# The youtube API sends a get request for the initial authorization
@routes.get('/')
async def authenticate(request):
if 'hub.challenge' not in request.query:
return web.Response(status=400)
print("Authenticated")
challenge_response = request.query.get('hub.challenge')
return web.Response(text=challenge_response, status=200)
# The youtube API sends post requests when new videos are posted
@routes.post('/')
async def receive(request):
# Ensure the post is of the proper type
content_type = request.content_type
if content_type != 'application/atom+xml':
return web.Response(status=400)

# Read all the data in the body and convert it to a dict
body_content = await request.content.read(n=-1)
data = xmltodict.parse(body_content, 'UTF-8')

# Ensure this is a proper video and its not already been announced before
if 'entry' in data['feed'] and data['feed']['entry']['yt:videoId'] not in self.memory:
entry = data['feed']['entry']
# Add video id to memory to prevent duplicates
self.memory.add(entry['yt:videoId'])
# store wanted data in a simple dict
video_data = {
'title': entry['title'],
'video_url': entry['link']['@href'],
'channel_name': entry['author']['name'],
'channel_url': entry['author']['uri'],
'date_published': entry['published'],
'video_id': entry['yt:videoId']
}
# Trigger the new_video event with video data
self.bot.dispatch('new_video', video_data)
return web.Response(status=200)
# Create application and connect the routes
app = web.Application()
app.add_routes(routes)
# Prepare the app runner
runner = web.AppRunner(app)
await runner.setup()
# Prepare the website
self.site = web.TCPSite(runner, '127.0.0.1', 8080)
# Wait until the discord bot is fully started
await self.bot.wait_until_ready()
# Start the web server
await self.site.start()
# Send subscribe request to API
await self.subscribe()

# Function to subscribe to the website 
async def subscribe(self):
# Start web client to send post request
async with ClientSession() as session:
# Grab the ID from the channel url
channel_id = self.bot.config["target_channel"].split('/')[-1]
# Prepare the form data required to subscribe
payload = {
'hub.callback': self.bot.config['callback_url'],
'hub.mode': 'subscribe',
'hub.topic': f'https://www.youtube.com/xml/feeds/videos.xml?channel_id={channel_id}',
'hub.lease_seconds': '', # Might want to define this, max 828000 seconds
'hub.secret': '',
'hub.verify': 'async',
'hub.verify_token': ''
}
# Send post request to the API
async with session.post('https://pubsubhubbub.appspot.com/subscribe', data=payload) as response:
# if status is 202 it worked
if response.status == 202:
print("Subscribe request sent")
else:
print("Failed to subscribe")

相关内容

  • 没有找到相关文章

最新更新