Kubernetes 1.9.6是否支持正确的连接排水



我一直在努力让kubernetes实现零停机部署。我尝试过在aws上使用类型为loadbalancer的服务,我尝试过使用nginxingress,我直接尝试过nodeport(我也尝试过ipv(。

在大多数情况下,它是有效的。大多数请求都能正常工作。为了测试,我一直在不断地重新部署一个带有序列号的应用程序,并检查以确保没有请求被丢弃。监控应用程序也在同一个kubernetes集群上运行,但要经过aws ELB才能返回到虚拟应用程序。

运行几个小时后,总是会有几个请求断开连接,从ELB获得504秒,或者连接超时。如果我让请求的响应时间延长500毫秒,那么会有更多的坏请求。

看起来kubernetes并没有从旧节点进行连接耗尽,而是切断了连接线。

我一直在尝试研究kubernetes代码库,以确定是否有任何连接耗尽,但我运气不佳。在syncProxyRules下的pkg/proxy/iptables/proxyer.go中,它似乎设置了所有的iptables规则,但看起来至少没有意识到该级别的连接耗尽。

TL;DR

我无法让kubernetes部署一个新版本,有时会断开连接。我是错过了一些连接耗尽选项,还是kubernetes不支持它?

我的设置

Kubernetes qos应用程序

package main;
import (
"os"
"os/signal"
"syscall"
"context"
"fmt"
"net/http"
"io/ioutil"
"github.com/DataDog/datadog-go/statsd"
"time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
"strconv"
)
func ApiCheck(address string, ddclient *statsd.Client) {
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
// if we just came up, wait for other app to go down before updating deployment...
time.Sleep(30 * time.Second)
for {
fmt.Println("Starting deployment")
start := time.Now()
var prevValue int
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// get latest version
result, getErr := deploymentsClient.Get("k8s-qos-dummy", metav1.GetOptions{})
if getErr != nil {
fmt.Printf("Failed to get latest version of Deployment: %v", getErr)
return getErr
}
var perr error
prevValue, perr = strconv.Atoi(result.Spec.Template.Spec.Containers[0].Args[1])
if perr != nil {
fmt.Printf("Cannot parse previous value %s, using 0 insteadn", result.Spec.Template.Spec.Containers[0].Args[1])
prevValue = 0
}
valstring := fmt.Sprintf("%d", prevValue + 1)
result.Spec.Template.Spec.Containers[0].Args[1] = valstring
fmt.Printf("Trying to update to %sn", valstring)
_, updateErr := deploymentsClient.Update(result)
return updateErr
})
if retryErr != nil {
fmt.Println("Update failed: %v", retryErr)
CheckDDError(ddclient.Incr("qos.k8s.deploy.update_error", nil, 1))
continue
}
fmt.Printf("Updated successfullyn")
// now wait for server to respond properly
for {
client := &http.Client{
Timeout: time.Second * 5,
}
response, err := client.Get(address)
if err != nil {
fmt.Printf("Failed qos deploy with http error: %sn", err)
CheckDDError(ddclient.Incr("qos.k8s.deploy.http_error", nil, 1))
} else {
defer response.Body.Close()
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
fmt.Printf("Failed qos deploy with io error: %sn", err)
CheckDDError(ddclient.Incr("qos.k8s.deploy.io_error", nil, 1))
} else {
if response.StatusCode >= 200 && response.StatusCode <= 299 {
// let's check the value
new_value, perr := strconv.Atoi(string(contents))
if perr != nil {
fmt.Printf("Failed to parse response for deploy %sn", perr.Error())
} else {
if new_value == prevValue + 1 {
fmt.Println("Deployment confirmed!")
elapsed := time.Since(start)
CheckDDError(ddclient.Timing("qos.k8s.deploy.time", elapsed, nil, 1))
time.Sleep(30 * time.Second)
break;
} else {
fmt.Printf("Got bad value: %d, wanted %dn", new_value, prevValue + 1)
elapsed := time.Since(start)
if elapsed > time.Second * 80 {
CheckDDError(ddclient.Incr("qos.k8s.deploy.timeout_err", nil, 1))
CheckDDError(ddclient.Timing("qos.k8s.deploy.time", elapsed, nil, 1))
time.Sleep(30 * time.Second)
break;
}
}
}
} else {
fmt.Printf("Failed qos deploy with http status error: %d %sn", response.StatusCode, string(contents))
CheckDDError(ddclient.Incr("qos.k8s.deploy.http_status_error", nil, 1))
}
}
}
time.Sleep(1 * time.Second)
}
}
}
func CheckDDError(derr error) {
if derr != nil {
fmt.Println("datadogs not working, got: %s", derr.Error())
}
}
func DummyCheck(address string, ddclient *statsd.Client) {
for {
client := &http.Client{
Timeout: time.Second * 5,
}
response, err := client.Get(address)
if err != nil {
fmt.Printf("Failed qos check with http error: %sn", err)CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 0, nil, 1))
CheckDDError(ddclient.Incr("qos.k8s.check.dummy_http_error", nil, 1))
} else {
defer response.Body.Close()
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
fmt.Printf("Failed qos check with io error: %sn", err)
CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 0, nil, 1))
CheckDDError(ddclient.Incr("qos.k8s.check.dummy_io_error", nil, 1))
} else {
if response.StatusCode >= 200 && response.StatusCode <= 299 {
fmt.Printf("Passed qos check with status: %d received: %sn", response.StatusCode, string(contents))
CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 1, nil, 1))
} else {
fmt.Printf("Failed qos check with http status error: %d %sn", response.StatusCode, string(contents))
CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 0, nil, 1))
CheckDDError(ddclient.Incr("qos.k8s.check.dummy_http_status_error", nil, 1))
}
}
}
time.Sleep(1 * time.Second)
}
}
func WebServer(resp string, ddclient *statsd.Client) {
srv := &http.Server{
Addr: ":7070",
IdleTimeout: 61 * time.Second, //ELB is default to 60 seconds idle timeout
}
http.HandleFunc("/", func (w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, resp)
})
fmt.Printf("current idle timeout: %vn", srv.IdleTimeout)
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
fmt.Printf("Got sigterm shutting downn")
srv.Shutdown(context.Background())
os.Exit(1)
}()
srv.ListenAndServe()
}
func main() {
if len(os.Args) < 2 {
fmt.Println("usage: k8s-qos [deploy|dummy]")
return
}
ddSock := fmt.Sprintf("%s:8125", os.Getenv("HOST_IP"))
ddc, err := statsd.New(ddSock)
if err != nil {
return
}
if os.Args[1] == "deploy" {
if len(os.Args) < 3 {
fmt.Println("usage: k8s-qos dummy [address]")
return
}
pingAddress := os.Args[2]
go WebServer(
fmt.Sprintf(
"Hey this is the deployer for qos, this app pings %s to make sure it works",
pingAddress),
ddc)
go ApiCheck(pingAddress, ddc)
DummyCheck(pingAddress, ddc)
return
}
if os.Args[1] == "dummy" {
if len(os.Args) < 3 {
fmt.Println("usage: k8s-qos dummy [response-string]")
return
}
WebServer(os.Args[2], ddc)
return
}
fmt.Println("no usage specified")
return
}

Kubernetes集群

在AWS 上使用Kops 1.9.0版本进行设置

kops集群配置:

apiVersion: kops/v1alpha2
kind: Cluster
metadata:
name: test-k8s.example.com
spec:
additionalPolicies:
node: |
[
{
"Effect": "Allow",
"Action": ["sts:AssumeRole"],
"Resource": ["*"]
}
]
api:
loadBalancer:
type: Internal
authorization:
rbac: {}
channel: stable
buttProvider: aws
configBase: s3://test-k8s/test-k8s.example.com
etcdClusters:
- etcdMembers:
- instanceGroup: master-us-west-2a
name: a
- instanceGroup: master-us-west-2b
name: b
- instanceGroup: master-us-west-2c
name: c
name: main
- etcdMembers:
- instanceGroup: master-us-west-2a
name: a
- instanceGroup: master-us-west-2b
name: b
- instanceGroup: master-us-west-2c
name: c
name: events
iam:
allowContainerRegistry: true
legacy: false
kubernetesApiAccess:
- 0.0.0.0/0
kubernetesVersion: 1.9.6
masterInternalName: api.internal.test-k8s.example.com
masterPublicName: api.test-k8s.example.com
networkCIDR: XX.XX.0.0/16
networkID: vpc-XXXXXXX
networking:
weave:
mtu: 8912
nonMasqueradeCIDR: XX.XX.0.0/10
sshAccess:
- XX.XX.XX.XX/32
subnets:
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: us-west-2a
type: Private
zone: us-west-2a
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: us-west-2b
type: Private
zone: us-west-2b
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: us-west-2c
type: Private
zone: us-west-2c
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: utility-us-west-2a
type: Utility
zone: us-west-2a
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: utility-us-west-2b
type: Utility
zone: us-west-2b
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: utility-us-west-2c
type: Utility
zone: us-west-2c
topology:
dns:
type: Private
masters: private
nodes: private

kops节点配置:

apiVersion: kops/v1alpha2
kind: InstanceGroup
metadata:
labels:
kops.k8s.io/cluster: test-k8s.example.com
name: nodes
spec:
image: XXXXXXXX/normal-kops-image-but-with-portmap-cni
machineType: t2.medium
maxSize: 3
minSize: 3
nodeLabels:
kops.k8s.io/instancegroup: nodes
role: Node
subnets:
- us-west-2a
- us-west-2b
- us-west-2c

Kubernetes应用程序配置

"伪"应用程序配置,也就是正在重新部署的应用程序:

apiVersion: apps/v1
kind: Deployment
metadata:
name: k8s-qos-dummy
spec:
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 1
minReadySeconds: 1
replicas: 3
selector:
matchLabels:
app: k8s-qos-dummy
template:
metadata:
name: k8s-qos-dummy
labels:
app: k8s-qos-dummy
spec:
containers:
- name: k8s-qos-dummy
image: XXXXXX
command: ["k8s-qos"]
args: [ "dummy", "1" ]
env:
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
ports:
- containerPort: 7070
livenessProbe:
httpGet:
path: /
port: 7070
initialDelaySeconds: 20
periodSeconds: 2
readinessProbe:
httpGet:
path: /
port: 7070
initialDelaySeconds: 5
periodSeconds: 5
successThreshold: 1
lifecycle:
preStop:
exec:
command: ["/bin/sleep", "61"]
resources:
limits:
memory: "200Mi"
cpu: ".25"
requests:
cpu: ".25"
memory: "200Mi"
---
apiVersion: v1
kind: Service
metadata:
name: k8s-qos-dummy
annotations:
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: http
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: XXXXXX
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "443"
service.beta.kubernetes.io/aws-load-balancer-internal: 0.0.0.0/0
service.beta.kubernetes.io/aws-load-balancer-extra-security-groups: "sg-XXXXX"
spec:
ports:
- port: 80
name: http
targetPort: 7070
ports:
- port: 443
name: https
targetPort: 7070
selector:
app: k8s-qos-dummy
type: LoadBalancer
loadBalancerSourceRanges:
- 127.0.0.0/32
---
#when testing with ingress
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: k8s-qos-dummy-ingress
spec:
rules:
- host: k8s-qos-dummy.example.com
http:
paths:
- backend:
serviceName: k8s-qos-dummy
servicePort: 443

重新部署程序/监视器应用程序配置:

apiVersion: v1
kind: ServiceAccount
metadata:
name: k8s-qos-role
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
namespace: default
name: k8s-qos-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "watch", "list", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: k8s-qos-role
subjects:
- kind: ServiceAccount
namespace: default
name: k8s-qos-role
roleRef:
kind: Role
name: k8s-qos-role
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: k8s-qos
spec:
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 0
maxSurge: 1
minReadySeconds: 5
replicas: 1
selector:
matchLabels:
app: k8s-qos
template:
metadata:
name: k8s-qos
labels:
app: k8s-qos
spec:
serviceAccountName: k8s-qos-role
containers:
- name: k8s-qos
image: XXXXXX
command: ["k8s-qos"]
args: [ "deploy", "https://k8s-qos-dummy.example.com/"]
env:
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
ports:
- containerPort: 7070
livenessProbe:
httpGet:
path: /
port: 7070
initialDelaySeconds: 20
periodSeconds: 2
readinessProbe:
httpGet:
path: /
port: 7070
initialDelaySeconds: 0
periodSeconds: 2
resources:
limits:
memory: "400Mi"
cpu: ".5"
requests:
cpu: ".25"
memory: "200Mi"

Kubernetes确实支持连接排出-它被称为优雅终止
在这个堆栈溢出问题中,您将找到关于它是什么以及它是如何工作的全面答案。因此,需要明确的是,正如本github问题中所描述的那样,这是一种理想的行为:在pod被删除后,Kubernetes在杀死它之前等待"宽限期"秒。pod只需要捕获SIGTERM,它就开始使任何就绪探测失败。此时,负载均衡器应该停止向该pod发送流量。如果pod在死亡时没有"及时"移除,它将杀死所有当前连接。我认为在你的情况下,你必须在应用程序内部寻找解决方案,或者尝试一些外部工具——如果我没记错的话,Istio有一些功能会有所帮助。但我没有足够的经验来直接向你指出。

最新更新