使用powershell从服务总线队列获取所有包含信息的消息



我的问题-是否有可能使用PowerShell从服务总线队列中获取所有消息以及所有信息(如内容和标头(?

拿一个我用的$postService = Invoke-WebRequest -Uri "https://myservice.servicebus.windows.net/test/messages/head" -Headers $header -Method Post

但它返回了不同的消息,我需要相同或全部的列表,不幸的是,我找不到用PS来做这件事的方法,而只能用C#=/

也许有人能帮忙?

下载代码并使用Get-Message从服务总线获取消息。

function Get-Message {
<#
.SYNOPSIS
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.

.DESCRIPTION
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.

.OUTPUTS
This Cmdlet returns the Messaging Factory message object. In case of failure is trying '-Retry' times, otherwise returns $null.

.INPUTS
See PARAMETER section for a description of input parameters.

.EXAMPLE
Retrieves a message from ServiceBus. The message is immediately removed from the facility after receipt.

PS > Get-Message;
CorrelationId :
SessionId :
ReplyToSessionId :
DeliveryCount : 1
ExpiresAtUtc : 31.12.9999 23:59:59
LockedUntilUtc :
LockToken :
MessageId : e6dc938213264de790e53ace7949f610
ContentType :
Label :
Properties : {}
ReplyTo :
EnqueuedTimeUtc : 24.12.2015 21:00:00
ScheduledEnqueueTimeUtc : 01.01.0001 00:00:00
SequenceNumber : 1
EnqueuedSequenceNumber : 0
Size : 6
State : Active
TimeToLive : 10675199.02:48:05.4775807
To :
IsBodyConsumed : False

.EXAMPLE
Retrieves a message from ServiceBus and save the message contents (body) to a variable. The message is immediately removed from the facility after receipt.

PS > $message = Get-Message -BodyAsProperty;
PS > write-host $message.Properties['Body']
I am a message body from ServiceBus with an arbitrary content.

.EXAMPLE
Similar to the previous example, but message is explicitly removed from the facility (in case the module default configuration is set to 'PeekLock').

PS > $message = Get-Message -ReceiveAndDelete -BodyAsProperty;
PS > write-host $message.Properties['Body']
I am a message body from ServiceBus with an arbitrary content.

.EXAMPLE
This example receives a message from the service bus. The message will be kept in the queue until it is explicity marked as 'Complete'

PS > $message = Get-Message -Receivemode 'PeekLock';
PS > # do something with the message before you acknowledge receipt
PS > $message.Complete();

.EXAMPLE
This example retrieves a message from the ServiceBus facility 'Topics-Newsletter'. The facility will be created if it does not exist.

PS > $message = Get-Message -Facility 'Topic-Newsletter' -EnsureFacility;

.EXAMPLE
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory against server defined within module configuration xml file.

PS > $receiver = Get-MessageReceiver -Facility 'MyQueue1';
PS > $message = Get-Message -Client $receiver;

#>
[CmdletBinding(
HelpURI = 'http://dfch.biz/biz/dfch/PS/AzureServiceBus/Client/'
)]
[OutputType([Microsoft.ServiceBus.Messaging.BrokeredMessage])]
Param 
(
# [Optional] The WaitTimeOutSec such as '3' Seconds for receiving a message before it times out. If you do not specify this
# value it is taken from the default value = 3 sec.
[Parameter(Mandatory = $false, Position = 0)]
[ValidateNotNullorEmpty()]
[int] $WaitTimeoutSec = [int]::MaxValue
, 
# [Optional] The Receivemode such as 'PeekLock'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 1)]
[ValidateSet('PeekLock', 'ReceiveAndDelete')]
[string] $Receivemode = 'ReceiveAndDelete'
,
# [Optional] ReceiveAndDelete same as Receivemode 'ReceiveAndDelete'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 2)]
[switch] $ReceiveAndDelete = $false
,
# [Optional] ReceiveAndComplete same as Receivemode 'PeekLock' and $Message.Complete(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 3)]
[switch] $ReceiveAndComplete = $false
,
# [Optional] ReceiveAndAbandon same as Receivemode 'PeekLock' and $Message.Abandon(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 4)]
[switch] $ReceiveAndAbandon = $false
,
# [Optional] BodyAsProperty writes the Message Body to Property 'Body'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 5)]
[switch] $BodyAsProperty = $false
,
# [Optional] The Facility such as 'MyQueue'. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory = $false, Position = 6)]
[ValidateNotNullorEmpty()]
[alias("queue")]
[alias("subscription")]
[alias("QueueName")]
[string] $Facility = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).ReceiveFacility
, 
# [Optional] Specifies if the facility will be created if it does not exist
[Parameter(Mandatory = $false, Position = 7)]
[alias("ensure")]
[alias("broadcast")]
[alias("checkfacility")]
[switch]$EnsureFacility = $false
,
# [Optional] Messaging Client (instance of the MessagingFactory)
[Parameter(Mandatory = $false, Position = 8)]
[alias("MessageClient")]
$Client
,
# [Optional] Skip retrying
[Parameter(Mandatory = $false, Position = 9)]
[switch]$NoRetry = $false
,
# [Optional] The Retry. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory=$false, Position = 10)]
[int]$Retry = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).CommandRetry
,
# [Optional] The RetryInterval. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory=$false, Position = 11)]
[int]$RetryInterval = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).CommandRetryInterval
)
BEGIN 
{
$datBegin = [datetime]::Now;
[string] $fn = $MyInvocation.MyCommand.Name;
Log-Debug $fn ("CALL. Facility '{0}'" -f $Facility ) -fac 1;

# Factory validation
if((Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).Factory -isnot [Microsoft.ServiceBus.Messaging.MessagingFactory]) {
$msg = "Factory: Factory validation FAILED. Connect to the server before using the Cmdlet.";
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).Factory;
$PSCmdlet.ThrowTerminatingError($e);
} # if
}
# BEGIN
PROCESS 
{
[boolean] $fReturn = $false;
# Get all parameters
$Params = @{};
$ParamsList = (Get-Command -Name $MyInvocation.InvocationName).Parameters;
foreach ($key in $ParamsList.keys)
{
$var = Get-Variable -Name $key -ErrorAction SilentlyContinue;
if($var)
{
if ( @('Retry', 'RetryInterval', 'NoRetry') -notcontains $($var.name) -and $var.value -ne $null -and $($var.value) -ne '' )  
{
$Params.Add($($var.name), $var.value);
}
}
}
#Log-Debug $fn ("Operation [{0}] arguments: {1}" -f ($fn -replace 'Worker', ''), ($Params | Out-String));

# Retry handling
for($c = 1; $c -le ($Retry+1); $c++)
{
try
{
$OutputParameter = Get-MessageWorker @Params;
break;
}
catch
{
# Throw last execption
if ( $NoRetry -or 
$c -gt $Retry -or 
$_.Exception.Message -match 'Connect to the message factory before using the Cmdlet.' -or
$_.Exception.Message -match 'The message body cannot be read multiple times.'
)
{
if ($PSCmdlet.MyInvocation.BoundParameters["Debug"].IsPresent) 
{
throw;
}
else
{
break;
}    
}
Log-Debug $fn ("[{0}/{1}] Retrying operation [{2}]" -f $c, $Retry, ($fn -replace 'Worker', ''));
Start-Sleep -Seconds $RetryInterval;
$RetryInterval *= 2;
continue;
}
}
return $OutputParameter;
$fReturn = $true;
}
# PROCESS
END 
{
$datEnd = [datetime]::Now;
Log-Debug -fn $fn -msg ("RET. fReturn: [{0}]. Execution time: [{1}]ms. Started: [{2}]." -f $fReturn, ($datEnd - $datBegin).TotalMilliseconds, $datBegin.ToString('yyyy-MM-dd HH:mm:ss.fffzzz')) -fac 2;
}
# END
} # function
if($MyInvocation.ScriptName) { Export-ModuleMember -Function Get-Message; } 
function Get-MessageWorker {
<#
.SYNOPSIS
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.

.DESCRIPTION
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.

.OUTPUTS
This Cmdlet returns the MessageId from the Messaging Factory message object. On failure it returns $null.

.INPUTS
See PARAMETER section for a description of input parameters.

.EXAMPLE
$message = Get-MessageWorker;

Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
#>
[CmdletBinding(
HelpURI = 'http://dfch.biz/biz/dfch/PS/AzureServiceBus/Client/'
)]
[OutputType([Microsoft.ServiceBus.Messaging.BrokeredMessage])]
Param 
(
# [Optional] The WaitTimeOutSec such as '3' Seconds for receiving a message before it times out. If you do not specify this
# value it is taken from the default value = 3 sec.
[Parameter(Mandatory = $false, Position = 0)]
[ValidateNotNullorEmpty()]
[int] $WaitTimeoutSec = [int]::MaxValue
, 
# [Optional] The Receivemode such as 'PeekLock'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 1)]
[ValidateSet('PeekLock', 'ReceiveAndDelete')]
[string] $Receivemode = 'ReceiveAndDelete'
,
# [Optional] ReceiveAndDelete same as Receivemode 'ReceiveAndDelete'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 2)]
[switch] $ReceiveAndDelete = $false
,
# [Optional] ReceiveAndComplete same as Receivemode 'PeekLock' and $Message.Complete(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 3)]
[switch] $ReceiveAndComplete = $false
,
# [Optional] ReceiveAndAbandon same as Receivemode 'PeekLock' and $Message.Abandon(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 4)]
[switch] $ReceiveAndAbandon = $false
,
# [Optional] BodyAsProperty writes the Message Body to Property 'Body'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 5)]
[switch] $BodyAsProperty = $false
,
# [Optional] The Facility such as 'MyQueue'. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory = $false, Position = 6)]
[ValidateNotNullorEmpty()]
[alias("queue")]
[alias("subscription")]
[alias("QueueName")]
[string] $Facility = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).ReceiveFacility
, 
# [Optional] Checks if facility existing
[Parameter(Mandatory = $false, Position = 7)]
[alias("ensure")]
[alias("broadcast")]
[alias("CreateIfNotExist")]
[switch] $EnsureFacility = $false
,
# [Optional] Messaging Client (instance of the MessagingFactory)
[Parameter(Mandatory = $false, Position = 8)]
[alias("MessageClient")]
$Client
)
BEGIN 
{
$datBegin = [datetime]::Now;
[string] $fn = $MyInvocation.MyCommand.Name;
Log-Debug $fn ("CALL. Facility '{0}'" -f $Facility ) -fac 1;
}
# BEGIN
PROCESS 
{
[boolean] $fReturn = $false;
try 
{
# Parameter validation
if ( $ReceiveAndDelete ) 
{
$Receivemode = 'ReceiveAndDelete';
}

# Check facility
if ( $EnsureFacility ) 
{
$Path = ($Facility -Split "\Subscriptions\")[0];
$SubscriptionName = "RECV-{0}" -f (get-wmiobject Win32_ComputerSystemProduct  | Select-Object -ExpandProperty UUID).toString();
if ( $Facility -match "\Subscriptions\" )
{
$SubscriptionName = ($Facility -Split "\Subscriptions\")[1];
}
try 
{
$FacilitiyExists = New-MessageFacility -Path $Path -Name $SubscriptionName;
$Facility = '{0}Subscriptions{1}' -f $Path, $SubscriptionName;
} 
catch 
{
$msg = $_.Exception.Message;
Log-Error -msg $msg;
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o $Facility;
$PSCmdlet.ThrowTerminatingError($e);
}
}

# Create Client
if ( !$PSBoundParameters.ContainsKey('Client') ) 
{
try 
{
$Client = Get-MessageReceiver -Facility $Facility -Receivemode $Receivemode;
} 
catch 
{
$msg = $_.Exception.Message;
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o $Client;
Log-Error $fn -msg $msg;
$PSCmdlet.ThrowTerminatingError($e);
}
}

# Get Message
try 
{
# DFTODO - is this really an AMQP message when we are using the BrokeredMessage class?, see #5
[Microsoft.ServiceBus.Messaging.BrokeredMessage] $BrokeredMessage = $Client.Receive((New-TimeSpan -Seconds $WaitTimeoutSec));
} catch {
$msg = $_.Exception.Message;
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o $BrokeredMessage;
Log-Error $fn -msg $msg;
$PSCmdlet.ThrowTerminatingError($e);
}

# Process Message
if ( $BrokeredMessage -ne $null ) 
{
if ( $ReceiveAndAbandon -and $Receivemode -ne 'ReceiveAndDelete' ) 
{
$BrokeredMessage.Abandon();
}

if ( $ReceiveAndComplete -and $Receivemode -ne 'ReceiveAndDelete' ) 
{
$BrokeredMessage.Complete();
}

if ( $BodyAsProperty ) 
{
$PropertyName = 'Body';
$PropertyValue = Get-MessageBody -Message $BrokeredMessage;
if ( $BrokeredMessage.Properties.ContainsKey($PropertyName) -and $BrokeredMessage.Properties[$PropertyName].toString() -ne $PropertyValue.toString() ) 
{
[int] $PropertyCount = 1;
$PropertyName = ("Body{0}" -f $PropertyCount);
while( $BrokeredMessage.Properties.ContainsKey($PropertyName) )
{
$PropertyCount += 1;
$PropertyName = ("Body{0}" -f $PropertyCount);
}
}
$BrokeredMessage.Properties[$PropertyName] = $PropertyValue;
}
}

$OutputParameter = $BrokeredMessage;
$fReturn = $true;
}
catch 
{
if($gotoSuccess -eq $_.Exception.Message) 
{
$fReturn = $true;
} 
else 
{
[string] $ErrorText = "catch [$($_.FullyQualifiedErrorId)]";
$ErrorText += (($_ | fl * -Force) | Out-String);
$ErrorText += (($_.Exception | fl * -Force) | Out-String);
$ErrorText += (Get-PSCallStack | Out-String);

if($_.Exception -is [System.Net.WebException]) 
{
Log-Critical $fn "Login to Uri '$Uri' with Username '$Username' FAILED [$_].";
Log-Debug $fn $ErrorText -fac 3;
}
else 
{
Log-Error $fn $ErrorText -fac 3;
if($gotoError -eq $_.Exception.Message) 
{
Log-Error $fn $e.Exception.Message;
$PSCmdlet.ThrowTerminatingError($e);
} 
elseif($gotoFailure -ne $_.Exception.Message) 
{ 
Write-Verbose ("$fn`n$ErrorText"); 
} 
else 
{
# N/A
}
}
$fReturn = $false;
$OutputParameter = $null;
}
}
finally 
{
# Clean up
# N/A
}
return $OutputParameter;
}
# PROCESS
END 
{
$datEnd = [datetime]::Now;
Log-Debug -fn $fn -msg ("RET. fReturn: [{0}]. Execution time: [{1}]ms. Started: [{2}]." -f $fReturn, ($datEnd - $datBegin).TotalMilliseconds, $datBegin.ToString('yyyy-MM-dd HH:mm:ss.fffzzz')) -fac 2;
}
# END
} # function

我为此编写了一个.NET包装,它公开了可以从PowerShell本机调用的PowerShell cmdlet。可以找到存储库在https://github.com/sumanthvrao/AzurePowerShellCmdlets.

导入dll后,可以使用类似的Get-ServiceBusMessagecmdlet从服务总线主题连接并获取消息

$message = Get-ServiceBusMessage -connectionString $env:connString -topicName "test" -subscriptionName "abc"

最新更新