Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ClusterResources for Listener controller #232

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ All notable changes to this project will be documented in this file.

- `Listener.status.addresses` for NodePort listeners now includes replicas that are currently unavailable ([#231]).
- `Listener.status.addresses` now defaults to DNS hostnames for all service types (previously NodePort and ClusterIP would prefer IP addresses, [#233]).
- Stale Listener subobjects will now be deleted ([#232]).
- Tagged Listener Services with the SDP labels ([#232]).

### Fixed

Expand All @@ -20,6 +22,7 @@ All notable changes to this project will be documented in this file.
- Propagate `ListenerClass.spec.serviceAnnotations` to the created Services ([#234]).

[#231]: https://github.com/stackabletech/listener-operator/pull/231
[#232]: https://github.com/stackabletech/listener-operator/pull/232
[#233]: https://github.com/stackabletech/listener-operator/pull/233
[#234]: https://github.com/stackabletech/listener-operator/pull/234

Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ prost = "0.13"
prost-types = "0.13"
serde = "1.0"
snafu = "0.8"
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.78.0" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.79.0" }
strum = { version = "0.26", features = ["derive"] }
socket2 = { version = "0.5", features = ["all"] }
tokio = { version = "1.40", features = ["full"] }
Expand All @@ -33,4 +33,4 @@ tracing = "0.1.40"

[patch."https://github.com/stackabletech/operator-rs.git"]
# stackable-operator = { path = "../operator-rs/crates/stackable-operator" }
stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }
# stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }
6 changes: 3 additions & 3 deletions crate-hashes.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

116 changes: 93 additions & 23 deletions rust/operator-binary/src/listener_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,23 @@ use futures::{
};
use snafu::{OptionExt, ResultExt, Snafu};
use stackable_operator::{
builder::meta::OwnerReferenceBuilder,
builder::meta::ObjectMetaBuilder,
cluster_resources::{ClusterResourceApplyStrategy, ClusterResources},
commons::listener::{
AddressType, Listener, ListenerClass, ListenerIngress, ListenerPort, ListenerSpec,
ListenerStatus, ServiceType,
},
iter::TryFromIterator,
k8s_openapi::{
api::core::v1::{Endpoints, Node, PersistentVolume, Service, ServicePort, ServiceSpec},
apimachinery::pkg::apis::meta::v1::LabelSelector,
},
kube::{
api::{DynamicObject, ObjectMeta},
runtime::{controller, reflector::ObjectRef, watcher},
ResourceExt,
Resource, ResourceExt,
},
kvp::{Annotations, Labels},
logging::controller::{report_controller_reconciled, ReconcilerError},
time::Duration,
};
Expand All @@ -31,12 +34,13 @@ use strum::IntoStaticStr;
use crate::{
csi_server::node::NODE_TOPOLOGY_LABEL_HOSTNAME,
utils::address::{node_primary_addresses, AddressCandidates},
APP_NAME, OPERATOR_KEY,
};

#[cfg(doc)]
use stackable_operator::k8s_openapi::api::core::v1::Pod;

const FIELD_MANAGER_SCOPE: &str = "listener";
const CONTROLLER_NAME: &str = "listener";

pub async fn run(client: stackable_operator::client::Client) {
let controller =
Expand Down Expand Up @@ -115,6 +119,11 @@ pub enum Error {
#[snafu(display("object has no name"))]
NoName,

#[snafu(display("failed to create cluster resources"))]
CreateClusterResources {
source: stackable_operator::cluster_resources::Error,
},

#[snafu(display("object has no ListenerClass (.spec.class_name)"))]
NoListenerClass,

Expand All @@ -133,6 +142,22 @@ pub enum Error {
source: stackable_operator::client::Error,
},

#[snafu(display("failed to validate labels passed through from Listener"))]
ValidateListenerLabels {
source: stackable_operator::kvp::LabelError,
},

#[snafu(display("failed to validate annotations specified by {listener_class}"))]
ValidateListenerClassAnnotations {
source: stackable_operator::kvp::AnnotationError,
listener_class: ObjectRef<ListenerClass>,
},

#[snafu(display("failed to build cluster resource labels"))]
BuildClusterResourcesLabels {
source: stackable_operator::kvp::LabelError,
},

#[snafu(display("failed to get {obj}"))]
GetObject {
source: stackable_operator::client::Error,
Expand All @@ -146,10 +171,15 @@ pub enum Error {

#[snafu(display("failed to apply {svc}"))]
ApplyService {
source: stackable_operator::client::Error,
source: stackable_operator::cluster_resources::Error,
svc: ObjectRef<Service>,
},

#[snafu(display("failed to delete orphaned resources"))]
DeleteOrphans {
source: stackable_operator::cluster_resources::Error,
},

#[snafu(display("failed to apply status for Listener"))]
ApplyStatus {
source: stackable_operator::client::Error,
Expand All @@ -165,19 +195,37 @@ impl ReconcilerError for Error {
match self {
Self::NoNs => None,
Self::NoName => None,
Self::CreateClusterResources { source: _ } => None,
Self::NoListenerClass => None,
Self::ListenerPvSelector { source: _ } => None,
Self::ListenerPodSelector { source: _ } => None,
Self::GetListenerPvs { source: _ } => None,
Self::ValidateListenerLabels { source: _ } => None,
Self::ValidateListenerClassAnnotations {
source: _,
listener_class,
} => Some(listener_class.clone().erase()),
Self::BuildClusterResourcesLabels { source: _ } => None,
Self::GetObject { source: _, obj } => Some(obj.clone()),
Self::BuildListenerOwnerRef { .. } => None,
Self::ApplyService { source: _, svc } => Some(svc.clone().erase()),
Self::DeleteOrphans { source: _ } => None,
Self::ApplyStatus { source: _ } => None,
}
}
}

pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<controller::Action> {
let mut cluster_resources = ClusterResources::new(
APP_NAME,
OPERATOR_KEY,
CONTROLLER_NAME,
&listener.object_ref(&()),
// Listeners don't currently support pausing
ClusterResourceApplyStrategy::Default,
)
.context(CreateClusterResourcesSnafu)?;

let ns = listener.metadata.namespace.as_deref().context(NoNsSnafu)?;
let listener_class_name = listener
.spec
Expand Down Expand Up @@ -231,18 +279,36 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
};

let svc = Service {
metadata: ObjectMeta {
namespace: Some(ns.to_string()),
name: Some(svc_name.clone()),
owner_references: Some(vec![OwnerReferenceBuilder::new()
.initialize_from_resource(&*listener)
.build()
.context(BuildListenerOwnerRefSnafu)?]),
// Propagate the labels from the Listener object to the Service object, so it can be found easier
labels: listener.metadata.labels.clone(),
annotations: Some(listener_class.spec.service_annotations),
..Default::default()
},
metadata: ObjectMetaBuilder::new()
.namespace(ns)
.name(&svc_name)
.ownerreference_from_resource(&*listener, Some(true), Some(true))
.context(BuildListenerOwnerRefSnafu)?
.with_labels(
Labels::try_from(
listener
.metadata
.labels
.as_ref()
.unwrap_or(&BTreeMap::new()),
)
.context(ValidateListenerLabelsSnafu)?,
)
.with_labels(
cluster_resources
// Not using Labels::recommended, since it carries a bunch of extra information that is
// only relevant for stacklets (such as rolegroups and product versions).
.get_required_labels()
.context(BuildClusterResourcesLabelsSnafu)?,
)
.with_annotations(
Annotations::try_from_iter(&listener_class.spec.service_annotations).context(
ValidateListenerClassAnnotationsSnafu {
listener_class: ObjectRef::from_obj(&listener_class),
},
)?,
)
.build(),
spec: Some(ServiceSpec {
// We explicitly match here and do not implement `ToString` as there might be more (non vanilla k8s Service
// types) in the future.
Expand All @@ -264,13 +330,11 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
}),
..Default::default()
};
let svc = ctx
.client
.apply_patch(FIELD_MANAGER_SCOPE, &svc, &svc)
let svc_ref = ObjectRef::from_obj(&svc);
let svc = cluster_resources
.add(&ctx.client, svc)
.await
.with_context(|_| ApplyServiceSnafu {
svc: ObjectRef::from_obj(&svc),
})?;
.context(ApplyServiceSnafu { svc: svc_ref })?;

let nodes: Vec<Node>;
let kubernetes_service_fqdn: String;
Expand Down Expand Up @@ -376,8 +440,14 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
),
node_ports: (listener_class.spec.service_type == ServiceType::NodePort).then_some(ports),
};

cluster_resources
.delete_orphaned_resources(&ctx.client)
.await
.context(DeleteOrphansSnafu)?;

ctx.client
.apply_patch_status(FIELD_MANAGER_SCOPE, &listener_status_meta, &listener_status)
.apply_patch_status(CONTROLLER_NAME, &listener_status_meta, &listener_status)
.await
.context(ApplyStatusSnafu)?;

Expand Down
4 changes: 3 additions & 1 deletion rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod csi_server;
mod listener_controller;
mod utils;

const APP_NAME: &str = "listener";
const OPERATOR_KEY: &str = "listeners.stackable.tech";

#[derive(clap::Parser)]
Expand Down Expand Up @@ -84,7 +85,8 @@ async fn main() -> anyhow::Result<()> {
built_info::RUSTC_VERSION,
);
let client =
stackable_operator::client::create_client(Some(OPERATOR_KEY.to_string())).await?;
stackable_operator::client::initialize_operator(Some(OPERATOR_KEY.to_string()))
.await?;
if csi_endpoint
.symlink_metadata()
.map_or(false, |meta| meta.file_type().is_socket())
Expand Down
Loading