Keda servicebus:由Keda scaledJob的一些pod处理的所有消息



我使用的是Azure Kubernetes Service(AKS)平台,使用的是KEDA ' scaledjob ';用于长时间运行的作业。在此,Azure服务总线队列触发器已用于自动触发作业。现在,当我在Azure服务总线中添加消息时,KEDA将根据配置自动触发作业并创建节点/pod。但在这种情况下,所有的消息都被一些pod接收和处理。期望每个扩展pod处理单个消息并终止。

下面是我的yml文件

apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{ .Chart.Name }}
spec:
jobTargetRef:
backoffLimit: 4
parallelism: 1
completions: 1
activeDeadlineSeconds: 300
template:
spec:
imagePullSecrets:
- name: {{ .Values.image.imagePullSecrets }}
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
volumes:
- name: azure
azureFile:
shareName: sharenameone
secretName: secret-sharenameone
readOnly: true
- name: one-storage
emptyDir: {}
- name: "logging-volume-file"
persistentVolumeClaim:
claimName: "azure-file-logging"        
initContainers:
- name: test-java-init
image: {{ .Values.global.imageRegistryURI }}/{{ .Values.image.javaInitImage.name}}:{{ .Values.image.javaInitImage.tag }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
securityContext:
readOnlyRootFilesystem: true
resources:
requests:
cpu: 100m
memory: 300Mi
limits:
cpu: 200m
memory: 400Mi
volumeMounts:
- name: azure
mountPath: /mnt/azure
- name: one-storage
mountPath: /certs
containers:
- name: {{ .Chart.Name }}
image: {{ .Values.global.imageRegistryURI }}/tests/{{ .Chart.Name }}:{{ .Values.version }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
{{- include "chart.envVars" . | nindent 14 }}
- name: JAVA_OPTS
value: >-
{{ .Values.application.javaOpts }}          
- name: application_name
value: "test_application"
- name: queueName
value: "test-queue-name"
- name: servicebusconnstrenv
valueFrom: 
secretKeyRef:
name: secrets-service-bus
key: service_bus_conn_str
volumeMounts:
- name: cert-storage
mountPath: /certs
- name: "logging-volume-azure-file"
mountPath: "/mnt/logging"
resources:
{{- toYaml .Values.resources | nindent 14 }}                   
pollingInterval: 30
maxReplicaCount: 5
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 20
triggers:
- type: azure-servicebus
metadata:
queueName: "test-queue-name"
connectionFromEnv: servicebusconnstrenv
messageCount: "1"

这是我的azure函数监听器

@FunctionName("TestServiceBusTrigger")
public void TestServiceBusTriggerHandler(
@ServiceBusQueueTrigger(
name = "msg",
queueName = "%TEST_QUEUE_NAME%",
connection = "ServiceBusConnectionString")
final String inputMessage,
final ExecutionContext context) {
final java.util.logging.Logger contextLogger = context.getLogger();
System.setProperty("javax.net.ssl.trustStore", "/certs/cacerts");

try {
// all the processing goes here 
} catch (Exception e) {
//Exception handling
}
}

需要添加哪些配置,以便每个扩展pod处理单个消息并终止?

这不是Azure Functions的设计方式,甚至不是KEDA的一般使用方式。如果已经运行的容器处理尽可能多的消息,那将是更理想的。

也就是说,如果你的场景仍然需要这个,你可以写一个简单的脚本,直接使用Azure服务总线SDK来获取一条消息,处理它,然后终止。

相关内容

  • 没有找到相关文章

最新更新