将Azure存储表服务数据复制到SQL Server



如何使用PowerShell将大量数据从Azure存储表服务复制到SQL Server?Microsoft建议使用的AzTable不支持在没有分区密钥基本信息的情况下进行增量加载,文档网站也关闭了…:https://learn.microsoft.com/en-us/azure/storage/tables/table-storage-how-to-use-powershell

我自己回答这个问题,因为我很难在网上找到解决方案,我想帮助其他人解决这个问题。

$StorageAccountResourceGroup = "MyResourceGroup"
$StorageAccountName = "MyStorageAccount"
$TableName = "MyStorageTableName"
$SqlConnectionString = "MySqlConnectionString"
$BulkCopy = [System.Data.SqlClient.SqlBulkCopy]::new()
$BulkCopy.DestinationTableName = "[myschema].[mytablename]"
$DataTable = <<Generate datatable from SQL-table>> # Code not included
Connect-AzAccount
$StorageAccountKey = (Get-AzStorageAccountKey -ResourceGroupName $StorageAccountResourceGroup -AccountName  $StorageAccountName | Where-Object -FilterScript {$_.KeyName -eq "Key1"}).Value
$Context = New-AzStorageContext -StorageAccountName $StorageAccountName -StorageAccountKey $StorageAccountKey
$CloudTable = (Get-AzStorageTable -Context $Context -Name $TableName).CloudTable
$TableQuery = [Microsoft.Azure.Cosmos.Table.TableQuery]::new()
$Token = $null
BulkCounter = 0
do{
$ReturnObject = $CloudTable.ExecuteQuerySegmented($TableQuery, $Token)
$Token = $ReturnObject.ContinuationToken

foreach($Entry in $ReturnObject){
$Row = $DataTable.NewRow()
foreach($Column in $DataTable.Columns){
if($Column.ColumnName -eq "TimeStamp"){
$Value = $Entry.TimeStamp
} elseif($Column.ColumnName -eq "RowKey"){
$Value = $Entry.RowKey
} elseif($Column.ColumnName -eq "PartitionKey"){
$Value = $Entry.PartitionKey
} else {
if($Column.DataType -eq [System.Decimal]){
$Value = $Entry.Properties.$($Column.ColumnName).DoubleValue
} elseif($Column.DataType -eq [System.String]){
$Value = $Entry.Properties.$($Column.ColumnName).StringValue
} elseif($Column.DataType -eq [System.Guid]){
$Value = $Entry.Properties.$($Column.ColumnName).GuidValue
} elseif($Column.DataType -eq [System.datetimeoffset]){
$Value = $Entry.Properties.$($Column.ColumnName).DateTimeOffsetValue
} elseif($Column.DataType -eq [System.Int32]){
$Value = $Entry.Properties.$($Column.ColumnName).Int32Value
} elseif($Column.DataType -eq [System.Int64]){
$Value = $Entry.Properties.$($Column.ColumnName).Int64Value
} elseif($Column.DataType -eq [System.Boolean]){
$Value = $Entry.Properties.$($Column.ColumnName).BooleanValue
} elseif($Column.DataType -eq [System.Binary]){
$Value = $Entry.Properties.$($Column.ColumnName).BinaryValue
} 
}
if([System.String]::IsNullOrWhiteSpace($Value)){
$Value = [System.DBNull]::value
}
$Row.($Column.ColumnName) = $Value
}
$DataTable.Rows.Add($Row)
$Counter++

}
$BulkCounter ++
if($BulkCounter % 25 -eq 0 -and $BulkCounter -ne 0){
$BulkCopy.WriteToServer($Datatable.CreateDataReader()) | Out-Null
$Datatable.Clear() | Out-Null
$BulkCounter = 0
}

} while ($Token)
if($Datatable.rows.count -gt 0){
$BulkCopy.WriteToServer($Datatable.CreateDataReader()) | Out-Null   
$Datatable.Clear() | Out-Null
}

这里还有一些从存储帐户表自动生成表的功能:

function ConvertTo-SqlTypeFromEDM {
param (
$EDMType
)
if($EDMType -eq "String"){
return "varchar(100)"
} elseif($EDMType -eq "Guid"){
return "uniqueidentifier"
} elseif ($EDMType -eq "DateTime"){
return "datetimeoffset(0)"
} elseif ($EDMType -eq "Binary"){
return "varbinary(max)"
} elseif ($EDMType -eq "Int32"){
return "int"
} elseif ($EDMType -eq "Int64"){
return "long"
} elseif ($EDMType -eq "Double"){
return "decimal(25,5)"
} elseif ($EDMType -eq "Boolean"){
return "bit"
} else {
throw "Tybe $EDMType not implemented"
}
}

function Get-SqlQueryFromStorageTableSerivce {
param(
$CloudTable,
$SchemaName
)
$Query = [Microsoft.Azure.Cosmos.Table.TableQuery]::new()
$Query.TakeCount = 1
$Token = $null
$Result = $CloudTable.ExecuteQuerySegmented($Query, $Token)
$TableName = $CloudTable.Name
$TableString = "DROP TABLE IF EXISTS [$SchemaName].[$TableName]`n"
$TableString += "CREATE TABLE [$SchemaName].[$TableName] (`n`t"
$ColumnString = @()
$TableKeys = @()
if($Result.PartitionKey){
$ColumnString += "[PartitionKey] varchar(100) NOT NULL"
$TableKeys += "[PartitionKey]"
}
if($Result.RowKey){
$ColumnString += "[RowKey] varchar(100) NOT NULL"
$TableKeys += "[RowKey]"
}
if($Result.Timestamp){
$ColumnString += "[Timestamp] datetimeoffset(0) NULL"
}
foreach($Column in $Result.Properties.Keys){
$ColumnString += "[$Column] $(ConvertTo-SqlTypeFromEDM -EDMType $Result.Properties.$Column.PropertyType) NULL"
}
$TableString += $ColumnString -join ",`n`t"
$TableString += "`nCONSTRAINT [PK_1_$($TableName)_1] PRIMARY KEY CLUSTERED`n"
$TableString += "(`n`t" + ($TableKeys -join ",`n`t")
$TableString += "`n)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF, DATA_COMPRESSION = PAGE) ON [PRIMARY]"
$TableString += "`n) ON [PRIMARY]`n"
return $TableString
}

最新更新