我正试图通过在Azure Databricks上启动Python脚本,将Salesforce数据增量加载到Azure SQL数据库。
由于我无法在Azure Databricks中安装Devart ODBC,我尝试使用simple_salesforce从salesforce:获取数据
import pandas as pd
import pyodbc
from simple_salesforce import Salesforce, SalesforceLogin, SFType
from sqlalchemy.types import Integer, Text, String, DateTime
from sqlalchemy import create_engine
import urllib
sf = Salesforce(password = password, username=username, security_token=jeton)
rep_qr = "SELECT SOMETHING FROM Account WHERE CONDITION"
soql = prep_qr.format(','.join(field_names))
results = sf.query_all(soql)['records']
我得到以下结果(一个例子(:
[OrderedDict([('attributes', OrderedDict([('type', 'Account'), ('url', '/services/data/v42.0/sobjects/Account/0014K000009aoU3QAI')])), ('Id', XY1), (Name, Y), (Date, 2020-11-24T09:16:17.000+0000)])]
然后我将输出转换为熊猫数据帧:
results = pd.DataFrame(sf.query_all(soql)['records'])
results.drop(columns=['attributes'], inplace=True) #to keep only the columns
我得到了这样的东西(只是一个例子(:
Id | 名称 | 日期 |
---|---|---|
XY1 | Y | 2020-11-24T06:17.000+0000 |
如果日期/时间值始终以2020-11-24T11:22:33.000+0000
形式的字符串返回,则可以使用pandas的.apply()
方法将字符串转换为SQL Server将接受的2020-11-24 11:22:33.000
格式:
df = pd.DataFrame(
[
(1, "2020-11-24T11:22:33.000+0000"),
(2, None),
(3, "2020-11-24T12:13:14.000+0000"),
],
columns=["id", "dtm"],
)
print(df)
"""console output:
id dtm
0 1 2020-11-24T11:22:33.000+0000
1 2 None
2 3 2020-11-24T12:13:14.000+0000
"""
df["dtm"] = df["dtm"].apply(lambda x: x[:23].replace("T", " ") if x else None)
print(df)
"""console output:
id dtm
0 1 2020-11-24 11:22:33.000
1 2 None
2 3 2020-11-24 12:13:14.000
"""
df.to_sql(
table_name,
engine,
index=False,
if_exists="append",
)
with engine.begin() as conn:
pprint(conn.execute(sa.text(f"SELECT * FROM {table_name}")).fetchall())
"""console output:
[(1, datetime.datetime(2020, 11, 24, 11, 22, 33)),
(2, None),
(3, datetime.datetime(2020, 11, 24, 12, 13, 14))]
"""