Refactor the logic of tagging kube container as local latest (#14367)
Why I did it We found a bug when pilot, the tag function doesn't remove the ACR domain when do tag, it makes the latest tag not work. And in the original tag function, it calls os.system and os.popen which are not recommend, need to refactor. How I did it Do a split("/") when get image_rep to fix the acr domain bug Refactor the tag function code and add test cases How to verify it Check whether container images are tagged as latest when in kube mode.
This commit is contained in:
parent
a6d597a811
commit
5a77cc4b58
@ -54,6 +54,16 @@ ST_FEAT_SYS_STATE = "system_state"
|
||||
KUBE_LABEL_TABLE = "KUBE_LABELS"
|
||||
KUBE_LABEL_SET_KEY = "SET"
|
||||
|
||||
MODE_KUBE = "kube"
|
||||
MODE_LOCAL = "local"
|
||||
OWNER_KUBE = "kube"
|
||||
OWNER_LOCAL = "local"
|
||||
OWNER_NONE = "none"
|
||||
REMOTE_READY = "ready"
|
||||
REMOTE_PENDING = "pending"
|
||||
REMOTE_STOPPED = "stopped"
|
||||
REMOTE_NONE = "none"
|
||||
|
||||
remote_connected = False
|
||||
|
||||
dflt_cfg_ser = {
|
||||
@ -89,6 +99,7 @@ JOIN_LATENCY = "join_latency_on_boot_seconds"
|
||||
JOIN_RETRY = "retry_join_interval_seconds"
|
||||
LABEL_RETRY = "retry_labels_update_seconds"
|
||||
TAG_IMAGE_LATEST = "tag_latest_image_on_wait_seconds"
|
||||
TAG_RETRY = "retry_tag_latest_seconds"
|
||||
USE_K8S_PROXY = "use_k8s_as_http_proxy"
|
||||
|
||||
remote_ctr_config = {
|
||||
@ -96,6 +107,7 @@ remote_ctr_config = {
|
||||
JOIN_RETRY: 10,
|
||||
LABEL_RETRY: 2,
|
||||
TAG_IMAGE_LATEST: 30,
|
||||
TAG_RETRY: 5,
|
||||
USE_K8S_PROXY: ""
|
||||
}
|
||||
|
||||
@ -151,9 +163,6 @@ def init():
|
||||
with open(SONIC_CTR_CONFIG, "r") as s:
|
||||
d = json.load(s)
|
||||
remote_ctr_config.update(d)
|
||||
if UNIT_TESTING:
|
||||
remote_ctr_config[TAG_IMAGE_LATEST] = 0
|
||||
|
||||
|
||||
class MainServer:
|
||||
""" Implements main io-loop of the application
|
||||
@ -437,55 +446,6 @@ class RemoteServerHandler:
|
||||
log_debug("kube_join_master failed retry after {} seconds @{}".
|
||||
format(remote_ctr_config[JOIN_RETRY], self.start_time))
|
||||
|
||||
|
||||
def tag_latest_image(server, feat, docker_id, image_ver):
|
||||
res = 1
|
||||
if not UNIT_TESTING:
|
||||
status = os.system("docker ps |grep {} >/dev/null".format(docker_id))
|
||||
if status:
|
||||
syslog.syslog(syslog.LOG_ERR,
|
||||
"Feature {}:{} is not stable".format(feat, image_ver))
|
||||
else:
|
||||
image_item = os.popen("docker inspect {} |jq -r .[].Image".format(docker_id)).read().strip()
|
||||
if image_item:
|
||||
image_id = image_item.split(":")[1][:12]
|
||||
image_info = os.popen("docker images |grep {}".format(image_id)).read().split()
|
||||
if image_info:
|
||||
image_rep = image_info[0]
|
||||
res = os.system("docker tag {} {}:latest".format(image_id, image_rep))
|
||||
if res != 0:
|
||||
syslog.syslog(syslog.LOG_ERR,
|
||||
"Failed to tag {}:{} to latest".format(image_rep, image_ver))
|
||||
else:
|
||||
syslog.syslog(syslog.LOG_INFO,
|
||||
"Successfully tag {}:{} to latest".format(image_rep, image_ver))
|
||||
feat_status = os.popen("docker inspect {} |jq -r .[].State.Running".format(feat)).read().strip()
|
||||
if feat_status:
|
||||
if feat_status == 'true':
|
||||
os.system("docker stop {}".format(feat))
|
||||
syslog.syslog(syslog.LOG_ERR,
|
||||
"{} should not run, stop it".format(feat))
|
||||
os.system("docker rm {}".format(feat))
|
||||
syslog.syslog(syslog.LOG_INFO,
|
||||
"Delete previous {} container".format(feat))
|
||||
else:
|
||||
syslog.syslog(syslog.LOG_ERR,
|
||||
"Failed to docker images |grep {} to get image repo".format(image_id))
|
||||
else:
|
||||
syslog.syslog(syslog.LOG_ERR,
|
||||
"Failed to inspect container:{} to get image id".format(docker_id))
|
||||
else:
|
||||
server.mod_db_entry(STATE_DB_NAME,
|
||||
FEATURE_TABLE, feat, {"tag_latest": "true"})
|
||||
res = 0
|
||||
if res:
|
||||
log_debug("failed to tag {}:{} to latest".format(feat, image_ver))
|
||||
else:
|
||||
log_debug("successfully tag {}:{} to latest".format(feat, image_ver))
|
||||
|
||||
return res
|
||||
|
||||
|
||||
#
|
||||
# Feature changes
|
||||
#
|
||||
@ -512,7 +472,9 @@ class FeatureTransitionHandler:
|
||||
# There after only called upon changes in either that requires action
|
||||
#
|
||||
if not is_systemd_active(feat):
|
||||
# Nothing todo, if system state is down
|
||||
# Restart the service manually when kube upgrade happens to decrease the down time
|
||||
if set_owner == MODE_KUBE and ct_owner == OWNER_NONE and remote_state == REMOTE_STOPPED:
|
||||
restart_systemd_service(self.server, feat, OWNER_KUBE)
|
||||
return
|
||||
|
||||
label_add = set_owner == "kube"
|
||||
@ -587,8 +549,7 @@ class FeatureTransitionHandler:
|
||||
# Tag latest
|
||||
start_time = datetime.datetime.now() + datetime.timedelta(
|
||||
seconds=remote_ctr_config[TAG_IMAGE_LATEST])
|
||||
self.server.register_timer(start_time, tag_latest_image, (
|
||||
self.server,
|
||||
self.server.register_timer(start_time, self.do_tag_latest, (
|
||||
key,
|
||||
self.st_data[key][ST_FEAT_CTR_ID],
|
||||
self.st_data[key][ST_FEAT_CTR_VER]))
|
||||
@ -596,10 +557,13 @@ class FeatureTransitionHandler:
|
||||
log_debug("try to tag latest label after {} seconds @{}".format(
|
||||
remote_ctr_config[TAG_IMAGE_LATEST], start_time))
|
||||
|
||||
if (not init) and (
|
||||
(old_remote_state == remote_state) or (remote_state != "pending")):
|
||||
# no change or nothing to do.
|
||||
return
|
||||
if (not init):
|
||||
if (old_remote_state == remote_state):
|
||||
# if no remote state change, do nothing.
|
||||
return
|
||||
if (remote_state not in (REMOTE_PENDING, REMOTE_STOPPED)):
|
||||
# if remote state not in pending or stopped, do nothing.
|
||||
return
|
||||
|
||||
if key in self.cfg_data:
|
||||
log_debug("{} init={} old_remote_state={} remote_state={}".format(key, init, old_remote_state, remote_state))
|
||||
@ -607,7 +571,18 @@ class FeatureTransitionHandler:
|
||||
self.st_data[key][ST_FEAT_OWNER],
|
||||
remote_state)
|
||||
return
|
||||
|
||||
def do_tag_latest(self, feat, docker_id, image_ver):
|
||||
ret = kube_commands.tag_latest(feat, docker_id, image_ver)
|
||||
if ret != 0:
|
||||
# Tag latest failed. Retry after an interval
|
||||
self.start_time = datetime.datetime.now()
|
||||
self.start_time += datetime.timedelta(
|
||||
seconds=remote_ctr_config[TAG_RETRY])
|
||||
self.server.register_timer(self.start_time, self.do_tag_latest, (feat, docker_id, image_ver))
|
||||
|
||||
log_debug("Tag latest as local failed retry after {} seconds @{}".
|
||||
format(remote_ctr_config[TAG_RETRY], self.start_time))
|
||||
|
||||
#
|
||||
# Label re-sync
|
||||
|
@ -412,7 +412,76 @@ def kube_reset_master(force):
|
||||
|
||||
return (ret, err)
|
||||
|
||||
def _do_tag(docker_id, image_ver):
|
||||
err = ""
|
||||
out = ""
|
||||
ret = 1
|
||||
status, _, err = _run_command("docker ps |grep {}".format(docker_id))
|
||||
if status == 0:
|
||||
_, image_item, err = _run_command("docker inspect {} |jq -r .[].Image".format(docker_id))
|
||||
if image_item:
|
||||
image_id = image_item.split(":")[1][:12]
|
||||
_, image_info, err = _run_command("docker images |grep {}".format(image_id))
|
||||
if image_info:
|
||||
# Only need the docker repo name without acr domain
|
||||
image_rep = image_info.split()[0].split("/")[-1]
|
||||
tag_res, _, err = _run_command("docker tag {} {}:latest".format(image_id, image_rep))
|
||||
if tag_res == 0:
|
||||
out = "docker tag {} {}:latest successfully".format(image_id, image_rep)
|
||||
ret = 0
|
||||
else:
|
||||
err = "Failed to tag {}:{} to latest. Err: {}".format(image_rep, image_ver, err)
|
||||
else:
|
||||
err = "Failed to docker images |grep {} to get image repo. Err: {}".format(image_id, err)
|
||||
else:
|
||||
err = "Failed to inspect container:{} to get image id. Err: {}".format(docker_id, err)
|
||||
elif err:
|
||||
err = "Error happens when execute docker ps |grep {}. Err: {}".format(docker_id, err)
|
||||
else:
|
||||
out = "New version {} is not running.".format(image_ver)
|
||||
ret = -1
|
||||
|
||||
return (ret, out, err)
|
||||
|
||||
def _remove_container(feat):
|
||||
err = ""
|
||||
out = ""
|
||||
ret = 0
|
||||
_, feat_status, err = _run_command("docker inspect {} |jq -r .[].State.Running".format(feat))
|
||||
if feat_status:
|
||||
if feat_status == 'true':
|
||||
err = "Feature {} container is running, it's unexpected".format(feat)
|
||||
ret = 1
|
||||
else:
|
||||
rm_res, _, err = _run_command("docker rm {}".format(feat))
|
||||
if rm_res == 0:
|
||||
out = "Remove origin local {} container successfully".format(feat)
|
||||
else:
|
||||
err = "Failed to docker rm {}. Err: {}".format(feat, err)
|
||||
ret = 1
|
||||
elif err.startswith("Error: No such object"):
|
||||
out = "Origin local {} container has been removed before".format(feat)
|
||||
err = ""
|
||||
else:
|
||||
err = "Failed to docker inspect {} |jq -r .[].State.Running. Err: {}".format(feat, err)
|
||||
ret = 1
|
||||
|
||||
return (ret, out, err)
|
||||
|
||||
def tag_latest(feat, docker_id, image_ver):
|
||||
ret, out, err = _do_tag(docker_id, image_ver)
|
||||
if ret == 0:
|
||||
log_debug(out)
|
||||
ret, out, err = _remove_container(feat)
|
||||
if ret == 0:
|
||||
log_debug(out)
|
||||
else:
|
||||
log_error(err)
|
||||
elif ret == -1:
|
||||
ret = 0
|
||||
else:
|
||||
log_error(err)
|
||||
return ret
|
||||
|
||||
def main():
|
||||
syslog.openlog("kube_commands")
|
||||
|
@ -4,6 +4,7 @@
|
||||
"retry_labels_update_seconds": 5,
|
||||
"revert_to_local_on_wait_seconds": 60,
|
||||
"tag_latest_image_on_wait_seconds": 600,
|
||||
"retry_tag_latest_seconds": 30,
|
||||
"use_k8s_as_http_proxy": "n"
|
||||
}
|
||||
|
||||
|
@ -58,6 +58,7 @@ PROC_FAIL = "proc_fail"
|
||||
PROC_THROW = "proc_throw"
|
||||
PROC_OUT = "subproc_output"
|
||||
PROC_ERR = "subproc_error"
|
||||
PROC_CODE = "subproc_code"
|
||||
PROC_KILLED = "procs_killed"
|
||||
|
||||
# container_start test cases
|
||||
@ -605,6 +606,7 @@ class mock_proc:
|
||||
|
||||
out_lst = current_test_data.get(PROC_OUT, None)
|
||||
err_lst = current_test_data.get(PROC_ERR, None)
|
||||
code_lst = current_test_data.get(PROC_CODE, None)
|
||||
if out_lst:
|
||||
assert (len(out_lst) > self.index)
|
||||
out = out_lst[self.index]
|
||||
@ -615,7 +617,11 @@ class mock_proc:
|
||||
err = err_lst[self.index]
|
||||
else:
|
||||
err = ""
|
||||
self.returncode = 0 if not err else -1
|
||||
if code_lst:
|
||||
assert (len(code_lst) > self.index)
|
||||
self.returncode = code_lst[self.index]
|
||||
else:
|
||||
self.returncode = 0 if not err else -1
|
||||
return (out, err)
|
||||
|
||||
def kill(self):
|
||||
@ -673,7 +679,8 @@ def create_remote_ctr_config_json():
|
||||
"join_latency_on_boot_seconds": 2,\n\
|
||||
"retry_join_interval_seconds": 0,\n\
|
||||
"retry_labels_update_seconds": 0,\n\
|
||||
"revert_to_local_on_wait_seconds": 5\n\
|
||||
"revert_to_local_on_wait_seconds": 5,\n\
|
||||
"tag_latest_image_on_wait_seconds": 0\n\
|
||||
}\n'
|
||||
|
||||
fname = "/tmp/remote_ctr.config.json"
|
||||
|
@ -311,15 +311,6 @@ feature_test_data = {
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
common_test.POST: {
|
||||
common_test.STATE_DB_NO: {
|
||||
common_test.FEATURE_TABLE: {
|
||||
"snmp": {
|
||||
"tag_latest": "true"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -213,6 +213,102 @@ none".format(KUBE_ADMIN_CONF),
|
||||
}
|
||||
}
|
||||
|
||||
tag_latest_test_data = {
|
||||
0: {
|
||||
common_test.DESCR: "Tag latest successfuly and remove origin local container",
|
||||
common_test.RETVAL: 0,
|
||||
common_test.ARGS: ["snmp", "123456", "v1"],
|
||||
common_test.PROC_CMD: [
|
||||
"docker ps |grep 123456",
|
||||
"docker inspect 123456 |jq -r .[].Image",
|
||||
"docker images |grep 5425bcbd23c5",
|
||||
"docker tag 5425bcbd23c5 snmp:latest",
|
||||
"docker inspect snmp |jq -r .[].State.Running",
|
||||
"docker rm snmp"
|
||||
],
|
||||
common_test.PROC_OUT: [
|
||||
"",
|
||||
"sha256:5425bcbd23c54270d9de028c09634f8e9a014e9351387160c133ccf3a53ab3dc",
|
||||
"acr.io/snmp v1 5425bcbd23c5",
|
||||
"",
|
||||
"false",
|
||||
""
|
||||
]
|
||||
},
|
||||
1: {
|
||||
common_test.DESCR: "Tag latest successfuly and origin local container has been removed before",
|
||||
common_test.RETVAL: 0,
|
||||
common_test.ARGS: ["snmp", "123456", "v1"],
|
||||
common_test.PROC_CMD: [
|
||||
"docker ps |grep 123456",
|
||||
"docker inspect 123456 |jq -r .[].Image",
|
||||
"docker images |grep 5425bcbd23c5",
|
||||
"docker tag 5425bcbd23c5 snmp:latest",
|
||||
"docker inspect snmp |jq -r .[].State.Running",
|
||||
"docker rm snmp"
|
||||
],
|
||||
common_test.PROC_OUT: [
|
||||
"",
|
||||
"sha256:5425bcbd23c54270d9de028c09634f8e9a014e9351387160c133ccf3a53ab3dc",
|
||||
"acr.io/snmp v1 5425bcbd23c5",
|
||||
"",
|
||||
"",
|
||||
""
|
||||
],
|
||||
common_test.PROC_ERR: [
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
"Error: No such object",
|
||||
""
|
||||
]
|
||||
},
|
||||
2: {
|
||||
common_test.DESCR: "Tag a unstable container",
|
||||
common_test.RETVAL: 0,
|
||||
common_test.ARGS: ["snmp", "123456", "v1"],
|
||||
common_test.PROC_CMD: [
|
||||
"docker ps |grep 123456"
|
||||
],
|
||||
common_test.PROC_CODE: [
|
||||
1
|
||||
]
|
||||
},
|
||||
3: {
|
||||
common_test.DESCR: "Docker error",
|
||||
common_test.RETVAL: 1,
|
||||
common_test.ARGS: ["snmp", "123456", "v1"],
|
||||
common_test.PROC_CMD: [
|
||||
"docker ps |grep 123456"
|
||||
],
|
||||
common_test.PROC_ERR: [
|
||||
"err"
|
||||
]
|
||||
},
|
||||
4: {
|
||||
common_test.DESCR: "Find local container is still running",
|
||||
common_test.RETVAL: 1,
|
||||
common_test.ARGS: ["snmp", "123456", "v1"],
|
||||
common_test.PROC_CMD: [
|
||||
"docker ps |grep 123456",
|
||||
"docker inspect 123456 |jq -r .[].Image",
|
||||
"docker images |grep 5425bcbd23c5",
|
||||
"docker tag 5425bcbd23c5 snmp:latest",
|
||||
"docker inspect snmp |jq -r .[].State.Running",
|
||||
"docker rm snmp"
|
||||
],
|
||||
common_test.PROC_OUT: [
|
||||
"",
|
||||
"sha256:5425bcbd23c54270d9de028c09634f8e9a014e9351387160c133ccf3a53ab3dc",
|
||||
"acr.io/snmp v1 5425bcbd23c5",
|
||||
"",
|
||||
"true",
|
||||
""
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
class TestKubeCommands(object):
|
||||
|
||||
def init(self):
|
||||
@ -360,3 +456,14 @@ clusters:\n\
|
||||
ct_data[common_test.ARGS][0])
|
||||
if common_test.RETVAL in ct_data:
|
||||
assert ret == ct_data[common_test.RETVAL]
|
||||
|
||||
@patch("kube_commands.subprocess.Popen")
|
||||
def test_tag_latest(self, mock_subproc):
|
||||
common_test.set_kube_mock(mock_subproc)
|
||||
|
||||
for (i, ct_data) in tag_latest_test_data.items():
|
||||
common_test.do_start_test("tag:latest", i, ct_data)
|
||||
|
||||
ret = kube_commands.tag_latest(*ct_data[common_test.ARGS])
|
||||
if common_test.RETVAL in ct_data:
|
||||
assert ret == ct_data[common_test.RETVAL]
|
||||
|
Loading…
Reference in New Issue
Block a user