我想导入以下输出:
kubectl get pods -o json
将放入python pandas数据框架中。这也应该包含所有的容器和那里的资源请求和限制。
我的代码开始如下:
import json
import numpy as np
import pandas as pd
import os
pods_raw = os.popen('kubectl get pods -o json').read()
pods_json = json.loads(pods_raw)['items']
从这里开始,我努力以正确的方式在数据框架中获取数据,特别是当多个容器存在时,应该拆分'spec.containers'。
下面是如何将感兴趣的数据提取到数据框架的示例。输出只是一个示例(因为您没有在问题中指定所需的输出):
import json
import pandas as pd
# open the Json data from file (or use os.popen):
with open("data.json", "r") as f_in:
data = json.load(f_in)
df = pd.DataFrame(data["items"])
# metadata:
df = pd.concat(
[df, df.pop("metadata").apply(pd.Series).add_prefix("meta_")], axis=1
)
# spec:
df = pd.concat(
[df, df.pop("spec").apply(pd.Series).add_prefix("spec_")], axis=1
)
# status:
df = pd.concat(
[df, df.pop("status").apply(pd.Series).add_prefix("status_")], axis=1
)
# keep only columns of interests:
df = df[["meta_name", "meta_namespace", "status_phase", "spec_containers"]]
# explode spec_containers column
df = df.explode("spec_containers")
df = pd.concat(
[
df,
df.pop("spec_containers")
.apply(pd.Series)
.add_prefix("spec_")[["spec_image", "spec_name"]],
],
axis=1,
)
print(df)
打印:
meta_name meta_namespace status_phase spec_image spec_name
0 apache-lb-648c5cb8cb-mw5zh default Running httpd apache
0 apache-lb-648c5cb8cb-mw5zh default Running index.docker.io/istio/proxyv2:1.13.4 istio-proxy
1 csi-cephfsplugin-fc79l default Running rocks.canonical.com:443/cdk/sig-storage/csi-node-driver-registrar:v2.0.1 driver-registrar
1 csi-cephfsplugin-fc79l default Running rocks.canonical.com:443/cdk/cephcsi/cephcsi:v3.3.1 csi-cephfsplugin
1 csi-cephfsplugin-fc79l default Running rocks.canonical.com:443/cdk/cephcsi/cephcsi:v3.3.1 liveness-prometheus
...and so on.
目前我有以下代码来解决这个问题:
#!/usr/bin/env python
import json
import pandas as pd
import os
kb = 1024
mb = kb * kb
gb = mb * kb
tb = gb * kb
def main():
pods_raw = os.popen('kubectl get pods -A -o json').read()
pods_json = json.loads(pods_raw)['items']
first_split = ['status','metadata','spec']
second_split = ['spec.containers','spec.containers.resources',"spec.containers.resources.limits","spec.containers.resources.requests"]
df_pods = pd.DataFrame.from_dict(pods_json)
df_pods = concat_data(df_pods, first_split)
df_pods = expand_data(df_pods, ['spec.containers'])
df_pods = concat_data(df_pods, second_split)
df_pods.index
df_pods.index.name='index'
col_to_normalize = ['spec.containers.resources.limits.cpu',
'spec.containers.resources.limits.memory',
'spec.containers.resources.requests.cpu',
'spec.containers.resources.requests.memory']
for col_name in col_to_normalize:
df_pods[col_name] = df_pods[col_name].map(normalize_values)
df_pods[col_to_normalize] = df_pods.groupby('index')[col_to_normalize].sum()
df_pods = df_pods.drop_duplicates(['metadata.name'])
df_pods[df_pods['status.phase'] == 'Running']
print(df_pods)
def concat_data(df: pd.DataFrame, expands: list) -> pd.DataFrame:
for expantion in expands:
# df = pd.concat( [df, df.pop(expantion).apply(pd.Series).add_prefix(f"{expantion}.")], axis=1)
df = pd.concat( [df, df.pop(expantion).apply(pd.Series).add_prefix(f"{expantion}.")], axis=1)
return df
def expand_data(df: pd.DataFrame, expands: list) -> pd.DataFrame:
for expantion in expands:
s = df[expantion].apply(pd.Series).stack()
s.index = s.index.droplevel(-1)
s.index
df.index = [x for x in df.index]
del df[expantion]
s.name = expantion
df=df.join(s)
return df
def normalize_values(val: str) -> int:
try:
if val[-1] == 'm':
return int(val[:-1]) / 1000
if val[-2].lower() == "k":
return int(val[:-2]) * kb
if val[-2].lower() == "m":
return int(val[:-2]) * mb
if val[-2].lower() == "g":
return int(val[:-2]) * gb
if val[-2].lower() == "t":
return int(val[:-2]) * tb
return int(val)
except:
return 0
if __name__ == '__main__':
main()
这个工作得很好,除了下面的FutureWarning我得到,不知道如何解决这个问题:
./resources.py:43: FutureWarning: The default dtype for empty Series will be 'object' instead of 'float64' in a future version. Specify a dtype explicitly to silence this warning.
df = pd。concat([df, df.pop(expand).apply(pd.Series).add_prefix(f"{expand}.")], axis=1)