不清楚熊猫上传到谷歌BigQuery表的错误消息



情况

我正在尝试将Twitter API数据的pandas数据帧上传到BigQuery中的表中。

这是我的数据帧准备代码从谷歌Colab笔记本:

!pip install --upgrade google-cloud-language
!pip install pandas-gbq -U
from google.colab import files
uploaded = files.upload()
for fn in uploaded.keys():
print('User uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))
import os
# Imports Credential File:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "pp-004a-d61bf3451d85.json"
print("Service Account Key: {}".format(os.environ["GOOGLE_APPLICATION_CREDENTIALS"]))
!pip install --upgrade tweepy
# VARIABLES
interval = "15"
start = '2022-04-07'
end = '2022-04-12'
# Tweepy
searchQ = '(max muncy) -is:retweet lang:en'
intval_tw = "{}T".format(interval)
start_tw = '{}T00:00:00Z'.format(start)
end_tw   = '{}T23:59:59Z'.format(end)
# index = pd.date_range('1/1/2000', periods=9, freq='T')
# D = calendar day frequency, H = hourly frequency, T, min = minutely frequency
# Library installs
import tweepy
# from twitter_authentication import bearer_token
import time
import pandas as pd
import requests
import json
import numpy as np
bearer_token = "BEARER_TOKEN"

client = tweepy.Client(bearer_token, wait_on_rate_limit=True)
# NEED TO ENSURE HAVE ALL PARAMETERS
gathered_tweets = []
for response in tweepy.Paginator(client.search_recent_tweets,
query = searchQ,
user_fields = ['name', 'description', 'username', 'profile_image_url', 'url', 'pinned_tweet_id', 'verified', 'created_at', 'location', 'public_metrics', 'entities'],
tweet_fields = ['public_metrics', 'created_at','lang', 'attachments', 'context_annotations', 'conversation_id', 'entities', 'geo', 'in_reply_to_user_id', 'possibly_sensitive', 'referenced_tweets', 'reply_settings', 'source'],
media_fields = ['duration_ms', 'media_key', 'preview_image_url', 'type', 'url', 'height', 'width', 'public_metrics'],
expansions = ['author_id', 'attachments.media_keys', 'entities.mentions.username', 'geo.place_id', 'in_reply_to_user_id', 'referenced_tweets.id', 'referenced_tweets.id.author_id'],
start_time = start_tw,
end_time = end_tw,
max_results=100):
time.sleep(1)
gathered_tweets.append(response)

result = []
user_dict = {}
# Loop through each response object
for response in gathered_tweets:
# Take all of the users, and put them into a dictionary of dictionaries with the info we want to keep
for user in response.includes['users']:
user_dict[user.id] = {'username': user.username,
'created_at': user.created_at,
'location': user.location,
'verified': user.verified,
'name': user.name,
'description': user.description,
'url': user.url,
'profile_image_url': user.profile_image_url,
'pinned_tweet': user.pinned_tweet_id,
'entities': user.entities,
'followers': user.public_metrics['followers_count'],
'total_tweets': user.public_metrics['tweet_count'],
'following': user.public_metrics['following_count'],
'listed': user.public_metrics['listed_count'],
'tweets': user.public_metrics['tweet_count']
}
for tweet in response.data:
# For each tweet, find the author's information
author_info = user_dict[tweet.author_id]
# Put all of the information we want to keep in a single dictionary for each tweet
result.append({'author_id': tweet.author_id,
'username': author_info['username'],
'name': author_info['name'],
'author_followers': author_info['followers'],
'author_following': author_info['following'],
'author_tweets': author_info['tweets'],
'author_description': author_info['description'],
'author_url': author_info['url'],
'profile_image_url': author_info['profile_image_url'],
#'pinned_tweet': author_info['pinned_tweet_id'], https://developer.twitter.com/en/docs/twitter-api/tweets/lookup/api-reference/get-tweets
#'total_tweets': author_info['tweet_count'],
#'listed_count': author_info['listed_count'],
'entities': author_info['entities'],
'verified': author_info['verified'],
'account_created_at': author_info['created_at'],
'text': tweet.text,
'created_at': tweet.created_at,
'lang': tweet.lang,
'tweet_id': tweet.id,
'retweets': tweet.public_metrics['retweet_count'],
'replies': tweet.public_metrics['reply_count'],
'likes': tweet.public_metrics['like_count'],
'quotes': tweet.public_metrics['quote_count'],
'replied': tweet.in_reply_to_user_id,
'sensitive': tweet.possibly_sensitive,
'referenced_tweets': tweet.referenced_tweets,
'reply_settings': tweet.reply_settings,
'source': tweet.source
#'video_views': tweet.public_metrics['view_count']
})
dfTW00 = pd.DataFrame(result)
dfTW01 = dfTW00
# Create 'engagement' metric
dfTW01['engagement'] = dfTW01['retweets'] + dfTW01['replies'] + dfTW01['likes'] + dfTW01['quotes']
# Add 'tweets' column with value of 1
dfTW01['tweets'] = 1
# Engagement Rate calc
dfTW01['eng_rate'] = (dfTW01['tweets'] / dfTW01['engagement'])
# Add twitter link
dfTW01['base_url'] = 'https://twitter.com/twitter/status/'
# base_url = 'https://twitter.com/twitter/status/'
dfTW01['tweet_link'] = dfTW01['base_url'] + dfTW01['tweet_id'].astype(str)
# Imports the Google Cloud client library
from google.cloud import language_v1
# Instantiates a client
client = language_v1.LanguageServiceClient()

def get_sentiment(text):
# The text to analyze
document = language_v1.Document(
content=text,
type_=language_v1.types.Document.Type.PLAIN_TEXT
)
# Detects the sentiment of the text
sentiment = client.analyze_sentiment(
request={"document": document}
).document_sentiment
return sentiment

dfTW01["sentiment"] = dfTW01["text"].apply(get_sentiment)
dfTW02 = dfTW01['sentiment'].astype(str).str.split(expand=True)
dfTW02
dfTW03 = pd.merge(dfTW01, dfTW02, left_index=True, right_index=True)
dfTW03.rename(columns = {1:'magnitude', 3:'score'}, inplace=True)
cols = ['magnitude', 'score']
dfTW03[cols] = dfTW03[cols].apply(pd.to_numeric, errors='coerce', axis=1)
def return_status(x):
if x >= .5:
return 'Positive'
elif x <= -.5:
return 'Negative'
return 'Neutral'
dfTW03['sentiment2'] = dfTW03['score'].apply(return_status)

我尝试过的

这是我用来上传的(我已经确认了项目、数据集和表格信息是正确的(:

df.to_gbq('004a01.004a-TW-01', 
'pp-004a',
chunksize=None, 
if_exists='append'
)

结果

但是,该方法返回以下错误消息:

TypeError: <' not supported between instances of 'int' and 'str'

评估

我在SO上找到了几篇关于这一点的帖子,但我无法将它们与我的情况联系起来。(我认为可以将各种数据类型上传到BigQuery表中。(

首先,我不清楚'<' not supported between instances of 'int' and 'str'的错误消息是什么意思。

如有任何意见,我们将不胜感激。

如果有用的话,下面是我的数据帧中的pandas数据类型。

数据帧数据类型

Pandas数据帧数据类型:

author_id                           int64
username                           object
name                               object
author_followers                    int64
author_following                    int64
author_tweets                       int64
author_description                 object
author_url                         object
profile_image_url                  object
entities                           object
verified                             bool
account_created_at    datetime64[ns, UTC]
text                               object
created_at            datetime64[ns, UTC]
lang                               object
tweet_id                            int64
retweets                            int64
replies                             int64
likes                               int64
quotes                              int64
replied                           float64
sensitive                            bool
referenced_tweets                  object
reply_settings                     object
source                             object
engagement                          int64
tweets                              int64
eng_rate                          float64
base_url                           object
tweet_link                         object
sentiment                          object
0                                  object
magnitude                         float64
2                                  object
score                             float64
sentiment_rating                  float64
sentiment2                         object
dtype: object

在将数据帧加载到BigQuery时,您可以尝试使用BigQuery库中的load_table_from_dataframe()函数,而不是Pandas的to_gbq()函数。

请参阅以下使用load_table_from_dataframe():的python代码示例

import datetime
from google.cloud import bigquery
import pandas
import pytz
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = "my-project.my-dataset.my-table"
records = [
{
"title": "The Meaning of Life",
"release_year": 1983,
"length_minutes": 112.5,
"release_date": pytz.timezone("Europe/Paris")
.localize(datetime.datetime(1983, 5, 9, 13, 0, 0))
.astimezone(pytz.utc),
# Assume UTC timezone when a datetime object contains no timezone.
"dvd_release": datetime.datetime(2002, 1, 22, 7, 0, 0),
},
{
"title": "Monty Python and the Holy Grail",
"release_year": 1975,
"length_minutes": 91.5,
"release_date": pytz.timezone("Europe/London")
.localize(datetime.datetime(1975, 4, 9, 23, 59, 2))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2002, 7, 16, 9, 0, 0),
},
{
"title": "Life of Brian",
"release_year": 1979,
"length_minutes": 94.25,
"release_date": pytz.timezone("America/New_York")
.localize(datetime.datetime(1979, 8, 17, 23, 59, 5))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2008, 1, 14, 8, 0, 0),
},
{
"title": "And Now for Something Completely Different",
"release_year": 1971,
"length_minutes": 88.0,
"release_date": pytz.timezone("Europe/London")
.localize(datetime.datetime(1971, 9, 28, 23, 59, 7))
.astimezone(pytz.utc),
"dvd_release": datetime.datetime(2003, 10, 22, 10, 0, 0),
},
]
dataframe = pandas.DataFrame(
records,
# In the loaded table, the column order reflects the order of the
# columns in the DataFrame.
columns=[
"title",
"release_year",
"length_minutes",
"release_date",
"dvd_release",
],
# Optionally, set a named index, which can also be written to the
# BigQuery table.
index=pandas.Index(
["Q24980", "Q25043", "Q24953", "Q16403"], name="wikidata_id"
),
)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[
# Specify the type of columns whose type cannot be auto-detected. For
# example the "title" column uses pandas dtype "object", so its
# data type is ambiguous.
bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING),
# Indexes are written if included in the schema by name.
bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING),
],
# Optionally, set the write disposition. BigQuery appends loaded rows
# to an existing table by default, but with WRITE_TRUNCATE write
# disposition it replaces the table with the loaded data.
write_disposition="WRITE_TRUNCATE"
)
job = client.load_table_from_dataframe(
dataframe, table_id, job_config=job_config
)  # Make an API request.
job.result()  # Wait for the job to complete.
table = client.get_table(table_id)  # Make an API request.
print(
"Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id
)
)

相关内容

最新更新