A Rust controller for Kubernetes

To teach myself Kubernetes in general and controllers in particular, I previously developed one in Java. This week, I decided to do the same in Rust by following the same steps I did.

  1. My first cup of Rust
  2. My second cup of Rust
  3. The Rustlings exercises — part 1
  4. The Rustlings exercises — part 2
  5. Rust on the front-end
  6. A Rust controller for Kubernetes (this post)

The guiding principle

I understand this would be better handled by the out-of-the-box admission controller, but it’s a good learning exercise. Because of that, I approached the development through several steps:

  1. Print the list of pods to the console. After the code is executed, the process stops. Kubernetes sees the pod as failing; it kills it and schedules a new one. Rinse and repeat.
  2. Make the program a loop so that it doesn’t exit
  3. Use a dedicated listener instead of a loop to be notified of lifecycle events
  4. Replace logging by doing the scheduling of the actual sidecar

The original Java project used quite more steps, but they are not relevant for this post.

Setting up the project

With cargo, it's pretty straightforward:

cargo new rust-operator     # 1
  1. Create the rust-operator folder and scaffold a new skeleton project in it. Alternatively, one can use cargo init instead in an existing folder.

The initial structure is the following:

rust-operator/
├── Cargo.toml
└── src
└── main.rs

With the following content:

[package]
name = "rust-operator" # 1
version = "0.1.0"
authors = ["Nicolas Frankel <nicolas@frankel.ch>"] # 2
edition = "2018" # 3
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html[dependencies] # 4
  1. Package name
  2. Author taken from the global Git configuration
  3. Latest Rust edition
  4. Start with an empty dependency list
fn main() {
println!("Hello, world!");
}

At this point, you can:

  1. Build the package cargo b
  2. And run it target/debug/rust-operator

As expected, it outputs:

Hello, world!

Logging

After some research, I found the log4rs crate:

log4rs is a highly configurable logging framework modeled after Java’s Logback and log4j libraries.

Given that I’ve don’t have enough perspective on the Rust library ecosystem, I chose log4rs because its design is similar to Log4J.

To use log4rs, you need to add two dependencies:

[dependencies]
log4rs = "1.0.0" # 1
log = "0.4.14" # 2
  1. Add log4rs
  2. log is a lightweight logging facade that will use log4rs as its implementation

We can now replace the logging macro in the source file:

use log::info;
use log4rs;
fn main() {
log4rs::init_file("log4rs.yml", Default::default()).unwrap(); // 1
info!("Hello, world!"); // 2
}
  1. Initialize log4rs, expecting a log4rs.yml file
  2. Proper log

Now, running the binary might output a log or not, depending on the log4rs.yml file content. With my sample, it prints:

2021-07-05T16:26:16.041150+02:00 INFO rust_operator - Hello, world!

Calling Kubernetes API from Rust

Crate for interacting with the Kubernetes API

This crate includes the tools for manipulating Kubernetes resources as well as keeping track of those resources as they change over time

We also need the Rust bindings generated from the Open API specification:

[dependencies]
kube = "0.52.0"
k8s-openapi = { version = "0.11.0", default-features = false,
features = ["v1_19"] } # 1
  1. The library uses features, one feature for each Kubernetes API version. Set the one relevant for your cluster version.

The entry point into the API is Client:

To get a Client, the easiest is to use Client::try_default(). As in the Java code:

Will use Config::infer to try in-cluster environment variables first, then fallback to the local kubeconfig.

Will fail if neither configuration could be loaded.

Asynchronous calls

fn main() {
Client::try_default();
}

The snippet compiles but outputs a warning:

warning: unused implementer of `futures::Future` that must be used
--> src/main.rs:14:5
|
14 | Client::try_default();
| ^^^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(unused_must_use)]` on by default
= note: futures do nothing unless you `.await` or poll them

As the compiler noticed, a Future does nothing on its own. To use it, we have to await it.

fn main() {
Client::try_default().await;
}

Now, the code fails with:

error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/main.rs:14:18
|
12 | fn main() {
| ---- this is not `async`
13 | log4rs::init_file("log4rs.yml", Default::default()).unwrap();
14 | let client = Client::try_default().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks

The lesson is that you can only call async functions from other async functions (and blocks); and main is not async. The easiest path is to bring Tokio, a crate dedicated to async programming. Tokio offers a macro to make main asynchronous.

#[tokio::main]
async fn main() {
Client::try_default().await;
}

The last step is to unwrap the Client contained inside the Result.

#[tokio::main]
async fn main() {
let client = Client::try_default().await.unwrap();
}

Listing pods

Let’s start small and list pods for now. Api::list() requires a ListParams parameter, which implements Default. Hence, the code is quite straightforward:

#[tokio::main]
async fn main() {
let api: Api<Pod> = Api::namespaced(client, "kube-system"); <1>
api.list(&ListParams::default())
.await
.unwrap()
.items
.iter()
.map(|pod| pod.name())
.for_each(|name| info!("{}", name));
}
  1. All pods objects in the kube-system namespace

The code works as expected!

Watching pods

#[tokio::main]
async fn main() {
let client = Client::try_default().await.unwrap();
let api: Api<Pod> = Api::namespaced(client, "kube-system");
let mut stream = api.watch(&ListParams::default(), "0") <1>
.await? <2>
.boxed(); <3>
while let Some(event) = stream
.try_next() <4>
.await? {
match event {
_ => {}
};
}
}
  1. Call watch() that returns a Result
  2. Await to either get the underlying Stream or return an Error
  3. Box the Stream. At the time of this writing, my understanding of pinning is zero, so assume that it's needed and works.
  4. Try to get the next item in the Stream wrapped in a Result
  5. Either get the underlying WatchEvent or return an Error

Yet, compilation fails because of await?:

error[E0277]: the `?` operator can only be used in an async block that returns `Result` or `Option` (or another type that implements `Try`)
--> src/main.rs:12:22
|
8 | async fn main() {
| _________________-
9 | | let client = Client::try_default().await.unwrap();
10 | | let api: Api<Pod> = Api::namespaced(client, "kube-system");
11 | | let mut stream = api.watch(&ListParams::default(), "0").await?.boxed();
| | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot use the `?` operator in an async block that returns `()`
12 | | while let Some(event) = stream.try_next().await? {
13 | | }
14 | | }
| |_- this function should return `Result` or `Option` to accept `?`
|
= help: the trait `Try` is not implemented for `()`
= note: required by `from_error`

Remember that Result either contains a regular value or a failure, in general, an Error. The ? operator is a shortcut on Result that either:

  • unwraps the regular value so you can proceed further
  • returns the failure from the current function

In the above snippet, the main() function does not define any return type. To fix the compilation problem, we need to add it:

#[tokio::main]
async fn main() -> Result<(), Error> { # 1
let client = Client::try_default().await.unwrap();
let api: Api<Pod> = Api::namespaced(client, "kube-system");
let mut stream = api.watch(&ListParams::default(), "0").await?.boxed();
while let Some(event) = stream.try_next().await? {
match event {
WatchEvent::Added(pod) => info!("ADDED: {}", pod.name()),
WatchEvent::Modified(pod) => info!("UPDATED: {}", pod.name()),
WatchEvent::Deleted(pod) => info!("DELETED: {}", pod.name()),
WatchEvent::Error(e) => error!("ERROR: {} {} ({})", e.code, e.message, e.status),
_ => {}
};
}
Ok(()) # 2
}
  1. Define an empty Result to return
  2. Necessary to compile successfully

The rest of the controller code is pretty straightforward: every time a pod is added, if it’s not a sidecar, add a sidecar to the pod, and make the latter its owner.

Containerizing the controller

FROM ekidd/rust-musl-builder:1.51.0 as buildWORKDIR /appCOPY src src
COPY Cargo.lock .
COPY Cargo.toml .
RUN cargo build --release # 1FROM scratch # 2WORKDIR /appCOPY --from=build /app/target/x86_64-unknown-linux-musl/release/rust-operator /app
COPY log4rs.yml . # 3
CMD ["./rust-operator"]
  1. Build the binary as a release
  2. Start from scratch for the smallest size possible
  3. Don’t forget the logging configuration file

IMHO, the final size is good enough:

REPOSITORY        TAG        IMAGE ID       CREATED             SIZE
rust-operator latest 5cac942d46a0 1 hour ago 18MB

Conclusion

  1. Setting up a project
  2. Adding the few needed dependencies
  3. Coding
  4. Configuring a multi-stage build
  5. Enjoy!

Besides readable code, the most significant benefit is all the Rust compiler’s hints to generate safe code.

The complete source code for this post can be found on Github.

To go further:

Originally published at A Java Geek on July 11th, 2021