Improve feature mode switch process (#12188)

* Fix kube mode to local mode long duration issue

* Remove IPV6 parameters which is not necessary

* Fix read node labels bug

* Tag the running image to latest if it's stable

* Disable image_version_higher check

* Change image_version_higher checker test case

Signed-off-by: Yun Li <yunli1@microsoft.com>
This commit is contained in:
lixiaoyuner 2022-11-02 17:24:32 +08:00 committed by GitHub
parent a31a4e7f82
commit e1440f0044
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 165 additions and 51 deletions

View File

@ -10,3 +10,4 @@ tests/__pycache__/
ctrmgr/__pycache__/
venv
tests/.coverage*
.pytest_cache/

View File

@ -30,6 +30,10 @@ STATE = "state"
KUBE_LABEL_TABLE = "KUBE_LABELS"
KUBE_LABEL_SET_KEY = "SET"
SERVER_TABLE = "KUBERNETES_MASTER"
SERVER_KEY = "SERVER"
ST_SER_CONNECTED = "connected"
ST_SER_UPDATE_TS = "update_time"
# Get seconds to wait for remote docker to start.
# If not, revert to local
@ -75,8 +79,10 @@ def read_data(is_config, feature, fields):
ret = []
db = cfg_db if is_config else state_db
tbl = swsscommon.Table(db, FEATURE_TABLE)
if feature == SERVER_KEY:
tbl = swsscommon.Table(db, SERVER_TABLE)
else:
tbl = swsscommon.Table(db, FEATURE_TABLE)
data = dict(tbl.get(feature)[1])
for (field, default) in fields:
@ -104,6 +110,13 @@ def read_state(feature):
[(CURRENT_OWNER, "none"), (REMOTE_STATE, "none"), (CONTAINER_ID, "")])
def read_server_state():
""" Read requried feature state """
return read_data(False, SERVER_KEY,
[(ST_SER_CONNECTED, "false"), (ST_SER_UPDATE_TS, "")])
def docker_action(action, feature, **kwargs):
""" Execute docker action """
try:
@ -192,9 +205,10 @@ def container_start(feature, **kwargs):
set_owner, fallback, _ = read_config(feature)
_, remote_state, _ = read_state(feature)
server_connected, _ = read_server_state()
debug_msg("{}: set_owner:{} fallback:{} remote_state:{}".format(
feature, set_owner, fallback, remote_state))
debug_msg("{}: set_owner:{} fallback:{} remote_state:{} server_connected:{}".format(
feature, set_owner, fallback, remote_state, server_connected))
data = {
SYSTEM_STATE: "up",
@ -207,8 +221,9 @@ def container_start(feature, **kwargs):
start_val = START_LOCAL
else:
start_val = START_KUBE
if fallback and (remote_state == "none"):
if fallback and (remote_state == "none" or server_connected == "false"):
start_val |= START_LOCAL
data[REMOTE_STATE] = "none"
if start_val == START_LOCAL:
# Implies *only* local.

View File

@ -232,14 +232,14 @@ def container_up(feature, owner, version):
do_freeze(feature, "This version is marked disabled. Exiting ...")
return
if not instance_higher(feature, state_data[VERSION], version):
# TODO: May Remove label <feature_name>_<version>_enabled
# Else kubelet will continue to re-deploy every 5 mins, until
# master removes the lable to un-deploy.
#
do_freeze(feature, "bail out as current deploy version {} is not higher".
format(version))
return
# if not instance_higher(feature, state_data[VERSION], version):
# # TODO: May Remove label <feature_name>_<version>_enabled
# # Else kubelet will continue to re-deploy every 5 mins, until
# # master removes the lable to un-deploy.
# #
# do_freeze(feature, "bail out as current deploy version {} is not higher".
# format(version))
# return
update_data(state_db, feature, { VERSION: version })

View File

@ -60,7 +60,7 @@ dflt_cfg_ser = {
CFG_SER_IP: "",
CFG_SER_PORT: "6443",
CFG_SER_DISABLE: "false",
CFG_SER_INSECURE: "false"
CFG_SER_INSECURE: "true"
}
dflt_st_ser = {
@ -88,18 +88,20 @@ dflt_st_feat= {
JOIN_LATENCY = "join_latency_on_boot_seconds"
JOIN_RETRY = "retry_join_interval_seconds"
LABEL_RETRY = "retry_labels_update_seconds"
TAG_IMAGE_LATEST = "tag_latest_image_on_wait_seconds"
USE_K8S_PROXY = "use_k8s_as_http_proxy"
remote_ctr_config = {
JOIN_LATENCY: 10,
JOIN_RETRY: 10,
LABEL_RETRY: 2,
TAG_IMAGE_LATEST: 30,
USE_K8S_PROXY: ""
}
def log_debug(m):
msg = "{}: {}".format(inspect.stack()[1][3], m)
print(msg)
#print(msg)
syslog.syslog(syslog.LOG_DEBUG, msg)
@ -148,6 +150,8 @@ def init():
with open(SONIC_CTR_CONFIG, "r") as s:
d = json.load(s)
remote_ctr_config.update(d)
if UNIT_TESTING:
remote_ctr_config[TAG_IMAGE_LATEST] = 0
class MainServer:
@ -172,11 +176,11 @@ class MainServer:
self.db_connectors[db_name] = swsscommon.DBConnector(db_name, 0)
def register_timer(self, ts, handler):
def register_timer(self, ts, handler, args=()):
""" Register timer based handler.
The handler will be called on/after give timestamp, ts
"""
self.timer_handlers[ts].append(handler)
self.timer_handlers[ts].append((handler, args))
def register_handler(self, db_name, table_name, handler):
@ -235,7 +239,7 @@ class MainServer:
lst = self.timer_handlers[k]
del self.timer_handlers[k]
for fn in lst:
fn()
fn[0](*fn[1])
else:
timeout = (k - ct_ts).seconds
break
@ -426,6 +430,54 @@ class RemoteServerHandler:
format(remote_ctr_config[JOIN_RETRY], self.start_time))
def tag_latest_image(server, feat, docker_id, image_ver):
res = 1
if not UNIT_TESTING:
status = os.system("docker ps |grep {} >/dev/null".format(docker_id))
if status:
syslog.syslog(syslog.LOG_ERR,
"Feature {}:{} is not stable".format(feat, image_ver))
else:
image_item = os.popen("docker inspect {} |jq -r .[].Image".format(docker_id)).read().strip()
if image_item:
image_id = image_item.split(":")[1][:12]
image_info = os.popen("docker images |grep {}".format(image_id)).read().split()
if image_info:
image_rep = image_info[0]
res = os.system("docker tag {} {}:latest".format(image_id, image_rep))
if res != 0:
syslog.syslog(syslog.LOG_ERR,
"Failed to tag {}:{} to latest".format(image_rep, image_ver))
else:
syslog.syslog(syslog.LOG_INFO,
"Successfully tag {}:{} to latest".format(image_rep, image_ver))
feat_status = os.popen("docker inspect {} |jq -r .[].State.Running".format(feat)).read().strip()
if feat_status:
if feat_status == 'true':
os.system("docker stop {}".format(feat))
syslog.syslog(syslog.LOG_ERR,
"{} should not run, stop it".format(feat))
os.system("docker rm {}".format(feat))
syslog.syslog(syslog.LOG_INFO,
"Delete previous {} container".format(feat))
else:
syslog.syslog(syslog.LOG_ERR,
"Failed to docker images |grep {} to get image repo".format(image_id))
else:
syslog.syslog(syslog.LOG_ERR,
"Failed to inspect container:{} to get image id".format(docker_id))
else:
server.mod_db_entry(STATE_DB_NAME,
FEATURE_TABLE, feat, {"tag_latest": "true"})
res = 0
if res:
log_debug("failed to tag {}:{} to latest".format(feat, image_ver))
else:
log_debug("successfully tag {}:{} to latest".format(feat, image_ver))
return res
#
# Feature changes
#
@ -523,6 +575,19 @@ class FeatureTransitionHandler:
self.st_data[key] = _update_entry(dflt_st_feat, data)
remote_state = self.st_data[key][ST_FEAT_REMOTE_STATE]
if (old_remote_state != remote_state) and (remote_state == "running"):
# Tag latest
start_time = datetime.datetime.now() + datetime.timedelta(
seconds=remote_ctr_config[TAG_IMAGE_LATEST])
self.server.register_timer(start_time, tag_latest_image, (
self.server,
key,
self.st_data[key][ST_FEAT_CTR_ID],
self.st_data[key][ST_FEAT_CTR_VER]))
log_debug("try to tag latest label after {} seconds @{}".format(
remote_ctr_config[TAG_IMAGE_LATEST], start_time))
if (not init) and (
(old_remote_state == remote_state) or (remote_state != "pending")):
# no change or nothing to do.

View File

@ -84,7 +84,7 @@ def _run_command(cmd, timeout=5):
def kube_read_labels():
""" Read current labels on node and return as dict. """
KUBECTL_GET_CMD = "kubectl --kubeconfig {} get nodes {} --show-labels |tr -s ' ' | cut -f6 -d' '"
KUBECTL_GET_CMD = "kubectl --kubeconfig {} get nodes {} --show-labels --no-headers |tr -s ' ' | cut -f6 -d' '"
labels = {}
ret, out, _ = _run_command(KUBECTL_GET_CMD.format(
@ -332,12 +332,12 @@ def _do_reset(pending_join = False):
def _do_join(server, port, insecure):
KUBEADM_JOIN_CMD = "kubeadm join --discovery-file {} --node-name {} --apiserver-advertise-address {}"
KUBEADM_JOIN_CMD = "kubeadm join --discovery-file {} --node-name {}"
err = ""
out = ""
ret = 0
try:
local_ipv6 = _get_local_ipv6()
#local_ipv6 = _get_local_ipv6()
#_download_file(server, port, insecure)
_gen_cli_kubeconf(server, port, insecure)
_do_reset(True)
@ -349,7 +349,7 @@ def _do_join(server, port, insecure):
if ret == 0:
(ret, out, err) = _run_command(KUBEADM_JOIN_CMD.format(
KUBE_ADMIN_CONF, get_device_name(), local_ipv6), timeout=60)
KUBE_ADMIN_CONF, get_device_name()), timeout=60)
log_debug("ret = {}".format(ret))
except IOError as e:

View File

@ -3,6 +3,7 @@
"retry_join_interval_seconds": 30,
"retry_labels_update_seconds": 5,
"revert_to_local_on_wait_seconds": 60,
"tag_latest_image_on_wait_seconds": 600,
"use_k8s_as_http_proxy": "n"
}

View File

@ -169,7 +169,7 @@ startup_test_data = {
common_test.FEATURE_TABLE: {
"snmp": {
"container_id": "no_change",
"container_version": "20201230.77",
"container_version": "20201230.11",
"current_owner": "no_change",
"remote_state": "no_change",
"system_state": "up"

View File

@ -125,6 +125,11 @@ start_test_data = {
"current_owner": "none",
"container_id": ""
}
},
common_test.SERVER_TABLE: {
"SERVER": {
"connected": "true"
}
}
}
},

View File

@ -106,7 +106,7 @@ server_test_data = {
common_test.KUBE_JOIN: {
"ip": "10.10.10.10",
"port": "6443",
"insecure": "false"
"insecure": "true"
}
}
},
@ -151,7 +151,7 @@ server_test_data = {
common_test.KUBE_JOIN: {
"ip": "10.10.10.10",
"port": "6443",
"insecure": "false"
"insecure": "true"
},
common_test.KUBE_RESET: {
"flag": "true"
@ -276,6 +276,51 @@ feature_test_data = {
}
}
}
},
3: {
common_test.DESCR: "Tag image latest when remote_state changes to running",
common_test.ARGS: "ctrmgrd",
common_test.PRE: {
common_test.CONFIG_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"set_owner": "kube"
}
}
},
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"remote_state": "pending"
}
}
}
},
common_test.UPD: {
common_test.CONFIG_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"set_owner": "kube"
}
}
},
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"remote_state": "running"
}
}
}
},
common_test.POST: {
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"tag_latest": "true"
}
}
}
}
}
}

View File

@ -27,7 +27,7 @@ read_labels_test_data = {
common_test.DESCR: "read labels",
common_test.RETVAL: 0,
common_test.PROC_CMD: ["\
kubectl --kubeconfig {} get nodes none --show-labels |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF)],
kubectl --kubeconfig {} get nodes none --show-labels --no-headers |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF)],
common_test.PROC_OUT: ["foo=bar,hello=world"],
common_test.POST: {
"foo": "bar",
@ -40,7 +40,7 @@ kubectl --kubeconfig {} get nodes none --show-labels |tr -s ' ' | cut -f6 -d' '"
common_test.TRIGGER_THROW: True,
common_test.RETVAL: -1,
common_test.PROC_CMD: ["\
kubectl --kubeconfig {} get nodes none --show-labels |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF)],
kubectl --kubeconfig {} get nodes none --show-labels --no-headers |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF)],
common_test.POST: {
},
common_test.PROC_KILLED: 1
@ -49,7 +49,7 @@ kubectl --kubeconfig {} get nodes none --show-labels |tr -s ' ' | cut -f6 -d' '"
common_test.DESCR: "read labels fail",
common_test.RETVAL: -1,
common_test.PROC_CMD: ["\
kubectl --kubeconfig {} get nodes none --show-labels |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF)],
kubectl --kubeconfig {} get nodes none --show-labels --no-headers |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF)],
common_test.PROC_OUT: [""],
common_test.PROC_ERR: ["command failed"],
common_test.POST: {
@ -64,7 +64,7 @@ write_labels_test_data = {
common_test.RETVAL: 0,
common_test.ARGS: { "foo": "bar", "hello": "World!", "test": "ok" },
common_test.PROC_CMD: [
"kubectl --kubeconfig {} get nodes none --show-labels |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF),
"kubectl --kubeconfig {} get nodes none --show-labels --no-headers |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF),
"kubectl --kubeconfig {} label --overwrite nodes none hello-".format(
KUBE_ADMIN_CONF),
"kubectl --kubeconfig {} label --overwrite nodes none hello=World! test=ok".format(
@ -77,7 +77,7 @@ write_labels_test_data = {
common_test.RETVAL: 0,
common_test.ARGS: { "foo": "bar", "hello": "world" },
common_test.PROC_CMD: [
"kubectl --kubeconfig {} get nodes none --show-labels |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF)
"kubectl --kubeconfig {} get nodes none --show-labels --no-headers |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF)
],
common_test.PROC_OUT: ["foo=bar,hello=world"]
},
@ -87,7 +87,7 @@ write_labels_test_data = {
common_test.ARGS: { "any": "thing" },
common_test.RETVAL: -1,
common_test.PROC_CMD: [
"kubectl --kubeconfig {} get nodes none --show-labels |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF)
"kubectl --kubeconfig {} get nodes none --show-labels --no-headers |tr -s ' ' | cut -f6 -d' '".format(KUBE_ADMIN_CONF)
],
common_test.PROC_ERR: ["read failed"]
}
@ -110,19 +110,10 @@ none".format(KUBE_ADMIN_CONF),
"mkdir -p {}".format(CNI_DIR),
"cp {} {}".format(FLANNEL_CONF_FILE, CNI_DIR),
"systemctl start kubelet",
"kubeadm join --discovery-file {} --node-name none --apiserver-advertise-address FC00:2::32".format(
"kubeadm join --discovery-file {} --node-name none".format(
KUBE_ADMIN_CONF)
],
common_test.PROC_RUN: [True, True],
common_test.PRE: {
common_test.CONFIG_DB_NO: {
common_test.MGMT_INTERFACE_TABLE: {
"eth0|FC00:2::32/64": {
"gwaddr": "fc00:2::1"
}
}
}
},
common_test.REQ: {
"data": {"ca.crt": "test"}
}
@ -143,19 +134,10 @@ none".format(KUBE_ADMIN_CONF),
"mkdir -p {}".format(CNI_DIR),
"cp {} {}".format(FLANNEL_CONF_FILE, CNI_DIR),
"systemctl start kubelet",
"kubeadm join --discovery-file {} --node-name none --apiserver-advertise-address FC00:2::32".format(
"kubeadm join --discovery-file {} --node-name none".format(
KUBE_ADMIN_CONF)
],
common_test.PROC_RUN: [True, True],
common_test.PRE: {
common_test.CONFIG_DB_NO: {
common_test.MGMT_INTERFACE_TABLE: {
"eth0|FC00:2::32/64": {
"gwaddr": "fc00:2::1"
}
}
}
},
common_test.REQ: {
"data": {"ca.crt": "test"}
}