Skip to content

Commit

Permalink
track updates to objects
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Sep 19, 2023
1 parent daabb40 commit c70ec12
Show file tree
Hide file tree
Showing 20 changed files with 307 additions and 233 deletions.
6 changes: 3 additions & 3 deletions ctrl/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;
use kube::runtime::controller::Action;
use kube::ResourceExt;
use reqwest::Url;
use simkube::prelude::*;
use simkube::trace::storage;
use simkube::util::{
use simkube::k8s::{
add_common_fields,
namespaced_name,
};
use simkube::prelude::*;
use simkube::trace::storage;
use tokio::time::Duration;
use tracing::*;

Expand Down
44 changes: 21 additions & 23 deletions driver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,22 @@ use anyhow::bail;
use clap::Parser;
use k8s_openapi::api::core::v1 as corev1;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;
use kube::api::DynamicObject;
use kube::api::{
DynamicObject,
Patch,
PatchParams,
};
use kube::ResourceExt;
use serde_json::json;
use simkube::json::{
patch_ext_add,
patch_ext_remove,
};
use simkube::prelude::*;
use simkube::trace::Tracer;
use simkube::util::{
use simkube::jsonutils;
use simkube::k8s::{
add_common_fields,
get_api_resource,
prefixed_ns,
GVKKey,

Check warning on line 26 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L26

Added line #L26 was not covered by tests
};
use simkube::prelude::*;
use simkube::trace::Tracer;

Check warning on line 29 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L29

Added line #L29 was not covered by tests
use tokio::time::sleep;
use tracing::*;

Expand Down Expand Up @@ -74,16 +76,16 @@ fn build_virtual_obj(
};
let psp = &config.tracked_objects[&gvk].pod_spec_path;

patch_ext_add(psp, "nodeSelector", &json!({"type": "virtual"}), &mut vobj.data, true)?;
patch_ext_add(psp, "tolerations", &json!([]), &mut vobj.data, false)?;
patch_ext_add(
jsonutils::patch_ext::add(psp, "nodeSelector", &json!({"type": "virtual"}), &mut vobj.data, true)?;
jsonutils::patch_ext::add(psp, "tolerations", &json!([]), &mut vobj.data, false)?;
jsonutils::patch_ext::add(

Check warning on line 81 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L79-L81

Added lines #L79 - L81 were not covered by tests
&format!("{}/tolerations", psp),
"-",
&json!({"key": "simkube.io/virtual-node", "value": "true"}),
&mut vobj.data,
true,
)?;
patch_ext_remove(psp, "status", &mut vobj.data)?;
jsonutils::patch_ext::remove(psp, "status", &mut vobj.data)?;

Check warning on line 88 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L88

Added line #L88 was not covered by tests
add_common_fields(sim_name, root, &mut vobj)?;

Ok(vobj)
Expand All @@ -104,11 +106,8 @@ async fn run(args: &Options) -> anyhow::Result<()> {

let mut sim_ts = tracer.start_ts().expect("no trace data");
for (evt, next_ts) in tracer.iter() {
for obj in evt.created_objs {
let key = match &obj.types {
Some(t) => GVKKey { gvk: t.try_into()? },
None => bail!("no type data present"),
};
for obj in evt.applied_objs {
let key = GVKKey::from_dynamic_obj(&obj)?;

Check warning on line 110 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L109-L110

Added lines #L109 - L110 were not covered by tests
let vns_name = prefixed_ns(&args.sim_namespace_prefix, &obj);
let obj_api = match obj_apis.entry((key.clone(), vns_name.clone())) {
Entry::Vacant(e) => {
Expand All @@ -122,16 +121,15 @@ async fn run(args: &Options) -> anyhow::Result<()> {

let vobj = build_virtual_obj(&obj, &vns_name, &args.sim_name, &root, tracer.config())?;

info!("creating object {:?}", vobj);
obj_api.create(&Default::default(), &vobj).await?;
info!("applying object {:?}", vobj);
obj_api
.patch(&vobj.name_any(), &PatchParams::apply("simkube"), &Patch::Apply(&vobj))
.await?;

Check warning on line 127 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L124-L127

Added lines #L124 - L127 were not covered by tests
}

for obj in evt.deleted_objs {
info!("deleting pod {}", obj.name_any());
let key = match &obj.types {
Some(t) => GVKKey { gvk: t.try_into()? },
None => bail!("no type data present"),
};
let key = GVKKey::from_dynamic_obj(&obj)?;

Check warning on line 132 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L132

Added line #L132 was not covered by tests
let vns_name = prefixed_ns(&args.sim_namespace_prefix, &obj);
match obj_apis.get(&(key, vns_name)) {
Some(obj_api) => _ = obj_api.delete(&obj.name_any(), &Default::default()).await?,
Expand Down
60 changes: 6 additions & 54 deletions lib/rust/config.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
use std::collections::HashMap;
use std::fmt;
use std::fs::File;

use kube::api::GroupVersionKind;
use serde::{
de,
Deserialize,
Deserializer,
Serialize,
Serializer,
};

use crate::k8s::GVKKey;

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TrackedObject {
pub pod_spec_path: String,
pub watched_fields: Vec<String>,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
Expand All @@ -23,53 +20,8 @@ pub struct TracerConfig {
pub tracked_objects: HashMap<GVKKey, TrackedObject>,
}

#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct GVKKey {
pub gvk: GroupVersionKind,
}

impl Serialize for GVKKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let skey = format!("{}/{}.{}", self.gvk.group, self.gvk.version, self.gvk.kind);
serializer.serialize_str(&skey)
}
}

struct ObjectKeyVisitor;

impl<'de> de::Visitor<'de> for ObjectKeyVisitor {
type Value = GVKKey;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a GroupVersionKind in the format group/version.kind")
}

fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
let parts: Vec<_> = value.split(|c| c == '/' || c == '.').collect();
if parts.len() != 3 {
return Err(E::custom(format!("invalid format for gvk: {}", value)));
}
Ok(GVKKey {
gvk: GroupVersionKind {
group: parts[0].into(),
version: parts[1].into(),
kind: parts[2].into(),
},
})
}
}

impl<'de> Deserialize<'de> for GVKKey {
fn deserialize<D>(deserializer: D) -> Result<GVKKey, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_str(ObjectKeyVisitor)
impl TracerConfig {
pub fn load(filename: &str) -> anyhow::Result<TracerConfig> {
Ok(serde_yaml::from_reader(File::open(filename)?)?)

Check warning on line 25 in lib/rust/config.rs

View check run for this annotation

Codecov / codecov/patch

lib/rust/config.rs#L24-L25

Added lines #L24 - L25 were not covered by tests
}
}
9 changes: 0 additions & 9 deletions lib/rust/json/mod.rs

This file was deleted.

18 changes: 18 additions & 0 deletions lib/rust/jsonutils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
pub mod patch_ext;

use std::collections::hash_map::DefaultHasher;
use std::hash::{
Hash,
Hasher,
};

use serde_json as json;

pub fn hash(value: &json::Value) -> anyhow::Result<u64> {
let mut s = DefaultHasher::new();
Hash::hash_slice(&serde_json::to_vec(value)?, &mut s);
Ok(s.finish())
}

#[cfg(test)]
mod patch_ext_test;
14 changes: 7 additions & 7 deletions lib/rust/json/patch_ext.rs → lib/rust/jsonutils/patch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ err_impl! {JsonPatchError,
UnexpectedType(String),
}

pub fn patch_ext_add(path: &str, key: &str, value: &Value, obj: &mut Value, overwrite: bool) -> anyhow::Result<()> {
pub fn add(path: &str, key: &str, value: &Value, obj: &mut Value, overwrite: bool) -> anyhow::Result<()> {
let parts: Vec<_> = path.split('*').collect();
for v in patch_ext_helper(&parts, obj).ok_or(JsonPatchError::invalid_pointer(path))? {
for v in patch_ext_helper_mut(&parts, obj).ok_or(JsonPatchError::invalid_pointer(path))? {
match v {
Value::Object(map) => {
if overwrite || !map.contains_key(key) {
Expand All @@ -37,16 +37,16 @@ pub fn patch_ext_add(path: &str, key: &str, value: &Value, obj: &mut Value, over
Ok(())
}

pub fn patch_ext_remove(path: &str, key: &str, obj: &mut Value) -> anyhow::Result<()> {
let parts: Vec<&str> = path.split('*').collect();
for v in patch_ext_helper(&parts, obj).ok_or(JsonPatchError::invalid_pointer(path))? {
pub fn remove(path: &str, key: &str, obj: &mut Value) -> anyhow::Result<()> {
let parts: Vec<_> = path.split('*').collect();
for v in patch_ext_helper_mut(&parts, obj).ok_or(JsonPatchError::invalid_pointer(path))? {
v.as_object_mut().ok_or(JsonPatchError::unexpected_type(path))?.remove(key);
}

Ok(())
}

fn patch_ext_helper<'a>(parts: &[&str], value: &'a mut Value) -> Option<Vec<&'a mut Value>> {
fn patch_ext_helper_mut<'a>(parts: &[&str], value: &'a mut Value) -> Option<Vec<&'a mut Value>> {
if parts.len() == 1 {
return Some(vec![value.pointer_mut(parts[0])?]);
}
Expand All @@ -58,7 +58,7 @@ fn patch_ext_helper<'a>(parts: &[&str], value: &'a mut Value) -> Option<Vec<&'a
let len = parts[0].len();
let next_array_val = value.pointer_mut(&parts[0][..len - 1])?.as_array_mut()?;
for v in next_array_val {
let cons = patch_ext_helper(&parts[1..], v)?;
let cons = patch_ext_helper_mut(&parts[1..], v)?;
res.extend(cons);
}
Some(res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde_json::{
Value,
};

use super::patch_ext::*;
use super::*;

#[fixture]
fn data() -> Value {
Expand All @@ -23,7 +23,7 @@ fn data() -> Value {
#[case::no_overwrite(false)]
fn test_patch_ext_add(mut data: Value, #[case] overwrite: bool) {
let path = "/foo/*/baz";
let res = patch_ext_add(path, "buzz", &json!(42), &mut data, overwrite);
let res = patch_ext::add(path, "buzz", &json!(42), &mut data, overwrite);
assert!(res.is_ok());
assert_eq!(
data,
Expand All @@ -40,7 +40,7 @@ fn test_patch_ext_add(mut data: Value, #[case] overwrite: bool) {
#[rstest]
fn test_patch_ext_remove(mut data: Value) {
let path = "/foo/*/baz";
let res = patch_ext_remove(path, "quzz", &mut data);
let res = patch_ext::remove(path, "quzz", &mut data);
assert!(res.is_ok());
assert_eq!(
data,
Expand Down
75 changes: 75 additions & 0 deletions lib/rust/k8s/gvk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::fmt;

use kube::api::{
DynamicObject,
GroupVersionKind,
};
use serde::{
de,
Deserialize,
Deserializer,
Serialize,
Serializer,
};

use crate::errors::*;

#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct GVKKey {
pub gvk: GroupVersionKind,
}

impl GVKKey {
pub fn from_dynamic_obj(obj: &DynamicObject) -> anyhow::Result<Self> {
match &obj.types {
Some(t) => Ok(GVKKey { gvk: t.try_into()? }),
None => bail!("no type data present"),

Check warning on line 26 in lib/rust/k8s/gvk.rs

View check run for this annotation

Codecov / codecov/patch

lib/rust/k8s/gvk.rs#L23-L26

Added lines #L23 - L26 were not covered by tests
}
}

Check warning on line 28 in lib/rust/k8s/gvk.rs

View check run for this annotation

Codecov / codecov/patch

lib/rust/k8s/gvk.rs#L28

Added line #L28 was not covered by tests
}

impl Serialize for GVKKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let skey = format!("{}/{}.{}", self.gvk.group, self.gvk.version, self.gvk.kind);
serializer.serialize_str(&skey)
}

Check warning on line 38 in lib/rust/k8s/gvk.rs

View check run for this annotation

Codecov / codecov/patch

lib/rust/k8s/gvk.rs#L32-L38

Added lines #L32 - L38 were not covered by tests
}

struct ObjectKeyVisitor;

impl<'de> de::Visitor<'de> for ObjectKeyVisitor {
type Value = GVKKey;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a GroupVersionKind in the format group/version.kind")
}

Check warning on line 48 in lib/rust/k8s/gvk.rs

View check run for this annotation

Codecov / codecov/patch

lib/rust/k8s/gvk.rs#L46-L48

Added lines #L46 - L48 were not covered by tests

fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
let parts: Vec<_> = value.split(|c| c == '/' || c == '.').collect();
if parts.len() != 3 {
return Err(E::custom(format!("invalid format for gvk: {}", value)));
}
Ok(GVKKey {
gvk: GroupVersionKind {
group: parts[0].into(),
version: parts[1].into(),
kind: parts[2].into(),
},
})
}

Check warning on line 65 in lib/rust/k8s/gvk.rs

View check run for this annotation

Codecov / codecov/patch

lib/rust/k8s/gvk.rs#L50-L65

Added lines #L50 - L65 were not covered by tests
}

impl<'de> Deserialize<'de> for GVKKey {
fn deserialize<D>(deserializer: D) -> Result<GVKKey, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_str(ObjectKeyVisitor)
}

Check warning on line 74 in lib/rust/k8s/gvk.rs

View check run for this annotation

Codecov / codecov/patch

lib/rust/k8s/gvk.rs#L69-L74

Added lines #L69 - L74 were not covered by tests
}
8 changes: 8 additions & 0 deletions lib/rust/k8s/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mod gvk;
mod util;

pub use gvk::*;
pub use util::*;

#[cfg(test)]
mod util_test;
Loading

0 comments on commit c70ec12

Please sign in to comment.