Clean up the old version container images (#14978)

Why I did it
Our k8s feature will pull new version container images for each upgrade, the container images inside sonic will be more and more, but for now we don’t have a way to clean up the old version container images, the disk may be filled up. Need to add cleaning up the old version container images logic.

Work item tracking
Microsoft ADO (number only):
17979809
How I did it
Remove the old version container images besides the feature's current version and last version image, last version image is saved for supporting fallback.

How to verify it
Check whether the old version images are removed
This commit is contained in:
lixiaoyuner 2023-05-19 01:37:34 +08:00 committed by mssonicbld
parent 7b6a7d8283
commit 8867d2459f
10 changed files with 270 additions and 16 deletions

View File

@ -27,6 +27,7 @@ REMOTE_STATE = "remote_state"
VERSION = "container_version"
SYSTEM_STATE = "system_state"
STATE = "state"
ST_FEAT_CTR_STABLE_VER = "container_stable_version"
KUBE_LABEL_TABLE = "KUBE_LABELS"
KUBE_LABEL_SET_KEY = "SET"
@ -132,6 +133,26 @@ def docker_action(action, feature, **kwargs):
return FAILURE
def container_version(feature):
""" Get container image version """
version = None
try:
client = docker.from_env()
container = client.containers.get(feature)
envs = container.attrs['Config']['Env']
for env in envs:
if env.startswith("IMAGE_VERSION="):
version = env.split('=')[1]
syslog.syslog(syslog.LOG_INFO, "docker get image version for {}".format(feature))
except (docker.errors.NotFound, docker.errors.APIError) as err:
syslog.syslog(syslog.LOG_ERR, "docker get image version for {} failed with {}".
format(feature, str(err)))
return version
def set_label(feature, create):
""" Set/drop label as required
Update is done in state-db.
@ -374,6 +395,11 @@ def container_wait(feature, **kwargs):
pend_wait_secs = 0
ret = SUCCESS
if docker_id == feature:
version = container_version(feature)
if version:
update_data(feature, {ST_FEAT_CTR_STABLE_VER: version})
if not docker_id and fallback:
pend_wait_secs = get_config_data(
SONIC_CTR_CONFIG_PEND_SECS, DEFAULT_PEND_SECS)

View File

@ -36,6 +36,11 @@ def _get_version_key(feature, version):
return "{}_{}_enabled".format(feature, version)
def _get_local_version_key(feature):
# Coin label for track laster local version
return "{}_local".format(feature)
def read_data(feature):
state_data = {
CURRENT_OWNER: "none",
@ -87,8 +92,8 @@ def check_version_blocked(state_db, feature, version):
#
tbl = swsscommon.Table(state_db, KUBE_LABEL_TABLE)
labels = dict(tbl.get(KUBE_LABEL_SET_KEY)[1])
key = _get_version_key(feature, version)
return (key in labels) and (labels[key].lower() == "false")
key = _get_local_version_key(feature)
return (key in labels) and (labels[key].lower() == version.lower())
def drop_label(state_db, feature, version):
@ -97,8 +102,8 @@ def drop_label(state_db, feature, version):
# ctrmgrd sets it with kube API server per reaschability
tbl = swsscommon.Table(state_db, KUBE_LABEL_TABLE)
name = _get_version_key(feature, version)
tbl.set(KUBE_LABEL_SET_KEY, [ (name, "false")])
name = _get_local_version_key(feature)
tbl.set(KUBE_LABEL_SET_KEY, [(name, version)])
def update_data(state_db, feature, data):

View File

@ -48,6 +48,8 @@ ST_FEAT_OWNER = "current_owner"
ST_FEAT_UPDATE_TS = "update_time"
ST_FEAT_CTR_ID = "container_id"
ST_FEAT_CTR_VER = "container_version"
ST_FEAT_CTR_STABLE_VER = "container_stable_version"
ST_FEAT_CTR_LAST_VER = "container_last_version"
ST_FEAT_REMOTE_STATE = "remote_state"
ST_FEAT_SYS_STATE = "system_state"
@ -59,6 +61,7 @@ MODE_LOCAL = "local"
OWNER_KUBE = "kube"
OWNER_LOCAL = "local"
OWNER_NONE = "none"
REMOTE_RUNNING = "running"
REMOTE_READY = "ready"
REMOTE_PENDING = "pending"
REMOTE_STOPPED = "stopped"
@ -91,6 +94,8 @@ dflt_st_feat= {
ST_FEAT_UPDATE_TS: "",
ST_FEAT_CTR_ID: "",
ST_FEAT_CTR_VER: "",
ST_FEAT_CTR_STABLE_VER: "",
ST_FEAT_CTR_LAST_VER: "",
ST_FEAT_REMOTE_STATE: "none",
ST_FEAT_SYS_STATE: ""
}
@ -100,18 +105,20 @@ JOIN_RETRY = "retry_join_interval_seconds"
LABEL_RETRY = "retry_labels_update_seconds"
TAG_IMAGE_LATEST = "tag_latest_image_on_wait_seconds"
TAG_RETRY = "retry_tag_latest_seconds"
CLEAN_IMAGE_RETRY = "retry_clean_image_seconds"
USE_K8S_PROXY = "use_k8s_as_http_proxy"
remote_ctr_config = {
JOIN_LATENCY: 10,
JOIN_RETRY: 10,
LABEL_RETRY: 2,
TAG_IMAGE_LATEST: 30,
TAG_IMAGE_LATEST: 5,
TAG_RETRY: 5,
CLEAN_IMAGE_RETRY: 5,
USE_K8S_PROXY: ""
}
ENABLED_FEATURE_SET = {"telemetry", "snmp"}
DISABLED_FEATURE_SET = {"database"}
def log_debug(m):
msg = "{}: {}".format(inspect.stack()[1][3], m)
@ -271,7 +278,7 @@ class MainServer:
key, op, fvs = subscriber.pop()
if not key:
continue
if subscriber.getTableName() == FEATURE_TABLE and key not in ENABLED_FEATURE_SET:
if subscriber.getTableName() == FEATURE_TABLE and key in DISABLED_FEATURE_SET:
continue
log_debug("Received message : '%s'" % str((key, op, fvs)))
for callback in (self.callbacks
@ -545,7 +552,7 @@ class FeatureTransitionHandler:
self.st_data[key] = _update_entry(dflt_st_feat, data)
remote_state = self.st_data[key][ST_FEAT_REMOTE_STATE]
if (old_remote_state != remote_state) and (remote_state == "running"):
if (remote_state == REMOTE_RUNNING) and (old_remote_state != remote_state):
# Tag latest
start_time = datetime.datetime.now() + datetime.timedelta(
seconds=remote_ctr_config[TAG_IMAGE_LATEST])
@ -583,6 +590,29 @@ class FeatureTransitionHandler:
log_debug("Tag latest as local failed retry after {} seconds @{}".
format(remote_ctr_config[TAG_RETRY], self.start_time))
else:
last_version = self.st_data[feat][ST_FEAT_CTR_STABLE_VER]
if last_version == image_ver:
last_version = self.st_data[feat][ST_FEAT_CTR_LAST_VER]
self.server.mod_db_entry(STATE_DB_NAME, FEATURE_TABLE, feat,
{ST_FEAT_CTR_STABLE_VER: image_ver,
ST_FEAT_CTR_LAST_VER: last_version})
self.st_data[ST_FEAT_CTR_LAST_VER] = last_version
self.st_data[ST_FEAT_CTR_STABLE_VER] = image_ver
self.do_clean_image(feat, image_ver, last_version)
def do_clean_image(self, feat, current_version, last_version):
ret = kube_commands.clean_image(feat, current_version, last_version)
if ret != 0:
# Clean up old version images failed. Retry after an interval
self.start_time = datetime.datetime.now()
self.start_time += datetime.timedelta(
seconds=remote_ctr_config[CLEAN_IMAGE_RETRY])
self.server.register_timer(self.start_time, self.do_clean_image, (feat, current_version, last_version))
log_debug("Clean up old version images failed retry after {} seconds @{}".
format(remote_ctr_config[CLEAN_IMAGE_RETRY], self.start_time))
#
# Label re-sync

View File

@ -483,6 +483,70 @@ def tag_latest(feat, docker_id, image_ver):
log_error(err)
return ret
def _do_clean(feat, current_version, last_version):
err = ""
out = ""
ret = 0
DOCKER_ID = "docker_id"
REPO = "repo"
_, image_info, err = _run_command("docker images |grep {} |grep -v latest |awk '{{print $1,$2,$3}}'".format(feat))
if image_info:
version_dict = {}
version_dict_default = {}
for info in image_info.split("\n"):
rep, version, docker_id = info.split()
if len(rep.split("/")) == 1:
version_dict_default[version] = {DOCKER_ID: docker_id, REPO: rep}
else:
version_dict[version] = {DOCKER_ID: docker_id, REPO: rep}
if current_version in version_dict:
image_prefix = version_dict[current_version][REPO]
del version_dict[current_version]
else:
out = "Current version {} doesn't exist.".format(current_version)
ret = 0
return ret, out, err
# should be only one item in version_dict_default
for k, v in version_dict_default.items():
local_version, local_repo, local_docker_id = k, v[REPO], v[DOCKER_ID]
tag_res, _, err = _run_command("docker tag {} {}:{} && docker rmi {}:{}".format(
local_docker_id, image_prefix, local_version, local_repo, local_version))
if tag_res == 0:
msg = "Tag {} local version images successfully".format(feat)
log_debug(msg)
else:
ret = 1
err = "Failed to tag {} local version images. Err: {}".format(feat, err)
return ret, out, err
if last_version in version_dict:
del version_dict[last_version]
versions = [item[DOCKER_ID] for item in version_dict.values()]
if versions:
clean_res, _, err = _run_command("docker rmi {} --force".format(" ".join(versions)))
else:
clean_res = 0
if clean_res == 0:
out = "Clean {} old version images successfully".format(feat)
else:
err = "Failed to clean {} old version images. Err: {}".format(feat, err)
ret = 1
else:
err = "Failed to docker images |grep {} |awk '{{print $3}}'".format(feat)
ret = 1
return ret, out, err
def clean_image(feat, current_version, last_version):
ret, out, err = _do_clean(feat, current_version, last_version)
if ret == 0:
log_debug(out)
else:
log_error(err)
return ret
def main():
syslog.openlog("kube_commands")
parser=argparse.ArgumentParser(description=

View File

@ -2,9 +2,10 @@
"join_latency_on_boot_seconds": 300,
"retry_join_interval_seconds": 30,
"retry_labels_update_seconds": 5,
"revert_to_local_on_wait_seconds": 60,
"revert_to_local_on_wait_seconds": 360,
"tag_latest_image_on_wait_seconds": 600,
"retry_tag_latest_seconds": 30,
"retry_clean_image_seconds": 30,
"use_k8s_as_http_proxy": "n"
}

View File

@ -185,6 +185,7 @@ class mock_container:
self.actions = []
self.name = name
self.image = mock_image(self.actions)
self.attrs = {"Config": {"Env": ["IMAGE_VERSION=20201231.11"]}}
def start(self):
@ -547,6 +548,19 @@ def set_mock_kube(kube_labels, kube_join, kube_reset):
kube_reset.side_effect = kube_reset_side_effect
def clean_image_side_effect(feat, current_version, last_version):
return 0
def tag_latest_side_effect(feat, docker_id, image_ver):
return 0
def set_mock_image_op(clean_image, tag_latest):
clean_image.side_effect = clean_image_side_effect
tag_latest.side_effect = tag_latest_side_effect
def str_comp(needle, hay):
nlen = len(needle)
hlen = len(hay)

View File

@ -36,7 +36,7 @@ startup_test_data = {
},
common_test.KUBE_LABEL_TABLE: {
"SET": {
"snmp_20201230.11_enabled": "false"
"snmp_local": "20201230.11"
}
}
}
@ -345,7 +345,7 @@ startup_test_data = {
},
common_test.KUBE_LABEL_TABLE: {
"SET": {
"snmp_20201230.11_enabled": "false"
"snmp_local": "20201230.11"
}
}
}
@ -363,7 +363,7 @@ startup_test_data = {
},
common_test.KUBE_LABEL_TABLE: {
"SET": {
"snmp_20201230.11_enabled": "false"
"snmp_local": "20201230.11"
}
}
}

View File

@ -416,7 +416,8 @@ wait_test_data = {
"remote_state": "none",
"system_state": "up",
"current_owner": "local",
"container_id": "snmp"
"container_id": "snmp",
"container_stable_version": "20201231.11"
}
}
}

View File

@ -291,7 +291,7 @@ feature_test_data = {
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"remote_state": "pending"
"remote_state": "ready"
}
}
}
@ -307,7 +307,19 @@ feature_test_data = {
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"remote_state": "running"
"remote_state": "running",
"container_version": "20201231.74",
"container_stable_version": "20201231.64"
}
}
}
},
common_test.POST: {
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"container_last_version": "20201231.64",
"container_stable_version": "20201231.74"
}
}
}
@ -473,13 +485,16 @@ class TestContainerStartup(object):
@patch("ctrmgrd.kube_commands.kube_reset_master")
@patch("ctrmgrd.kube_commands.kube_join_master")
@patch("ctrmgrd.kube_commands.kube_write_labels")
def test_feature(self, mock_kube_wr, mock_kube_join, mock_kube_rst, mock_subs,
@patch("ctrmgrd.kube_commands.tag_latest")
@patch("ctrmgrd.kube_commands.clean_image")
def test_feature(self, mock_clean_image, mock_tag_latest, mock_kube_wr, mock_kube_join, mock_kube_rst, mock_subs,
mock_select, mock_table, mock_conn):
self.init()
ret = 0
common_test.set_mock(mock_table, mock_conn)
common_test.set_mock_sel(mock_select, mock_subs)
common_test.set_mock_kube(mock_kube_wr, mock_kube_join, mock_kube_rst)
common_test.set_mock_image_op(mock_clean_image, mock_tag_latest)
for (i, ct_data) in feature_test_data.items():
common_test.do_start_test("ctrmgrd:feature", i, ct_data)

View File

@ -309,6 +309,93 @@ tag_latest_test_data = {
}
}
clean_image_test_data = {
0: {
common_test.DESCR: "Clean image successfuly(kube to kube)",
common_test.RETVAL: 0,
common_test.ARGS: ["snmp", "20201231.84", "20201231.74"],
common_test.PROC_CMD: [
"docker images |grep snmp |grep -v latest |awk '{print $1,$2,$3}'",
"docker rmi 744d3a09062f --force"
],
common_test.PROC_OUT: [
"sonick8scue.azurecr.io/docker-sonic-telemetry 20201231.74 507f8d28bf6e\n\
sonick8scue.azurecr.io/docker-sonic-telemetry 20201231.96 744d3a09062f\n\
sonick8scue.azurecr.io/docker-sonic-telemetry 20201231.84 507f8d28bf6e",
""
],
common_test.PROC_CODE: [
0,
0
]
},
1: {
common_test.DESCR: "Clean image failed(delete image failed)",
common_test.RETVAL: 1,
common_test.ARGS: ["snmp", "20201231.84", "20201231.74"],
common_test.PROC_CMD: [
"docker images |grep snmp |grep -v latest |awk '{print $1,$2,$3}'",
"docker rmi 744d3a09062f --force"
],
common_test.PROC_OUT: [
"sonick8scue.azurecr.io/docker-sonic-telemetry 20201231.74 507f8d28bf6e\n\
sonick8scue.azurecr.io/docker-sonic-telemetry 20201231.96 744d3a09062f\n\
sonick8scue.azurecr.io/docker-sonic-telemetry 20201231.84 507f8d28bf6e",
""
],
common_test.PROC_CODE: [
0,
1
]
},
2: {
common_test.DESCR: "Clean image failed(no image found)",
common_test.RETVAL: 1,
common_test.ARGS: ["snmp", "20201231.84", "20201231.74"],
common_test.PROC_CMD: [
"docker images |grep snmp |grep -v latest |awk '{print $1,$2,$3}'"
],
common_test.PROC_OUT: [
""
]
},
3: {
common_test.DESCR: "Clean image failed(current image doesn't exist)",
common_test.RETVAL: 0,
common_test.ARGS: ["snmp", "20201231.84", "20201231.74"],
common_test.PROC_CMD: [
"docker images |grep snmp |grep -v latest |awk '{print $1,$2,$3}'",
""
],
common_test.PROC_OUT: [
"sonick8scue.azurecr.io/docker-sonic-telemetry 20201231.74 507f8d28bf6e\n\
sonick8scue.azurecr.io/docker-sonic-telemetry 20201231.96 744d3a09062f",
""
],
common_test.PROC_CODE: [
0
]
},
4: {
common_test.DESCR: "Clean image successfuly(local to kube)",
common_test.RETVAL: 0,
common_test.ARGS: ["snmp", "20201231.84", ""],
common_test.PROC_CMD: [
"docker images |grep snmp |grep -v latest |awk '{print $1,$2,$3}'",
"docker tag 507f8d28bf6e sonick8scue.azurecr.io/docker-sonic-telemetry:20201231.74 && docker rmi docker-sonic-telemetry:20201231.74"
],
common_test.PROC_OUT: [
"docker-sonic-telemetry 20201231.74 507f8d28bf6e\n\
sonick8scue.azurecr.io/docker-sonic-telemetry 20201231.84 507f8d28bf6e",
""
],
common_test.PROC_CODE: [
0,
0
]
},
}
class TestKubeCommands(object):
def init(self):
@ -467,3 +554,14 @@ clusters:\n\
ret = kube_commands.tag_latest(*ct_data[common_test.ARGS])
if common_test.RETVAL in ct_data:
assert ret == ct_data[common_test.RETVAL]
@patch("kube_commands.subprocess.Popen")
def test_clean_image(self, mock_subproc):
common_test.set_kube_mock(mock_subproc)
for (i, ct_data) in clean_image_test_data.items():
common_test.do_start_test("clean:image", i, ct_data)
ret = kube_commands.clean_image(*ct_data[common_test.ARGS])
if common_test.RETVAL in ct_data:
assert ret == ct_data[common_test.RETVAL]