From dc76e01e9241735e3e4209a432754ebd02b092e5 Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Mon, 15 Apr 2024 20:41:01 +0200 Subject: [PATCH] more movement --- Cargo.lock | 1 + terraform-provider-example/src/main.rs | 3 +- terustform/Cargo.toml | 1 + terustform/src/datasource.rs | 3 +- terustform/src/lib.rs | 7 +- terustform/src/server/grpc.rs | 39 +-------- terustform/src/server/handler.rs | 116 +++++++++++++++++++++++++ terustform/src/server/mod.rs | 73 +--------------- 8 files changed, 132 insertions(+), 111 deletions(-) create mode 100644 terustform/src/server/handler.rs diff --git a/Cargo.lock b/Cargo.lock index 50ba838..18cb450 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1090,6 +1090,7 @@ dependencies = [ name = "terustform" version = "0.1.0" dependencies = [ + "async-trait", "base64 0.22.0", "eyre", "prost", diff --git a/terraform-provider-example/src/main.rs b/terraform-provider-example/src/main.rs index 451b33d..720725d 100644 --- a/terraform-provider-example/src/main.rs +++ b/terraform-provider-example/src/main.rs @@ -32,6 +32,7 @@ struct ExampleDataSourceModel { id: StringValue, } +#[terustform::async_trait] impl DataSource for ExampleDataSource { fn name(&self, provider_name: &str) -> String { format!("{provider_name}_kitty") @@ -69,7 +70,7 @@ impl DataSource for ExampleDataSource { } } - fn read(&self, config: Value) -> DResult { + async fn read(&self, config: Value) -> DResult { let mut model = ExampleDataSourceModel::from_value(config, &AttrPath::root())?; let name_str = model.name.expect_known(AttrPath::attr("name"))?; diff --git a/terustform/Cargo.toml b/terustform/Cargo.toml index f6c557e..427263e 100644 --- a/terustform/Cargo.toml +++ b/terustform/Cargo.toml @@ -22,6 +22,7 @@ tokio-util = "0.7.10" tonic = { version = "0.11.0", features = ["tls"] } tracing = "0.1.40" tracing-subscriber = "0.3.18" +async-trait = "0.1.80" [build-dependencies] tonic-build = "0.11.0" diff --git a/terustform/src/datasource.rs b/terustform/src/datasource.rs index 6261339..438965d 100644 --- a/terustform/src/datasource.rs +++ b/terustform/src/datasource.rs @@ -4,11 +4,12 @@ use crate::values::{Type, Value}; use super::DResult; +#[crate::async_trait] pub trait DataSource: Send + Sync { fn name(&self, provider_name: &str) -> String; fn schema(&self) -> Schema; // todo: probably want some kind of Value+Schema thing like tfsdk? whatever. - fn read(&self, config: Value) -> DResult; + async fn read(&self, config: Value) -> DResult; fn erase(self) -> Box where diff --git a/terustform/src/lib.rs b/terustform/src/lib.rs index 0650511..8e74b8f 100644 --- a/terustform/src/lib.rs +++ b/terustform/src/lib.rs @@ -6,11 +6,14 @@ pub mod datasource; pub mod provider; pub use diag::*; -pub use terustform_macros::Model; pub use values::*; -use provider::Provider; +pub use terustform_macros::Model; +pub use async_trait::async_trait; +pub use eyre; + +use provider::Provider; use tracing::Level; pub async fn start(provider: &dyn Provider) -> eyre::Result<()> { diff --git a/terustform/src/server/grpc.rs b/terustform/src/server/grpc.rs index 9d5b3f6..b82e60f 100644 --- a/terustform/src/server/grpc.rs +++ b/terustform/src/server/grpc.rs @@ -228,44 +228,9 @@ impl Provider for super::ProviderHandler { request: Request, ) -> Result, Status> { tracing::info!("read_data_source"); + let req = request.get_ref(); - let ds = self - .state - .as_ref() - .unwrap() - .data_sources - .get(&request.get_ref().type_name) - .unwrap(); - - let typ = ds.schema().typ(); - let config = match &request.get_ref().config { - None => crate::values::Value::Null, - Some(v) => { - let value = crate::values::Value::msg_unpack(&v.msgpack, &typ); - match value { - Ok(value) => value, - Err(errs) => { - return Ok(Response::new(tfplugin6::read_data_source::Response { - deferred: None, - state: None, - diagnostics: errs.to_tfplugin_diags(), - })); - } - } - } - }; - - let state = ds.read(config); - let (state, diagnostics) = match state { - Ok(s) => ( - Some(tfplugin6::DynamicValue { - msgpack: s.msg_pack(), - json: vec![], - }), - vec![], - ), - Err(errs) => (None, errs.to_tfplugin_diags()), - }; + let (state, diagnostics) = self.do_read_data_source(&req.type_name, &req.config).await; let reply = tfplugin6::read_data_source::Response { state, diff --git a/terustform/src/server/handler.rs b/terustform/src/server/handler.rs new file mode 100644 index 0000000..ccecb6c --- /dev/null +++ b/terustform/src/server/handler.rs @@ -0,0 +1,116 @@ +use std::collections::HashMap; + +use tokio_util::sync::CancellationToken; + +use crate::{datasource::DataSource, provider::Provider}; + +use super::{grpc::tfplugin6, Schemas}; + +pub struct ProviderHandler { + pub(super) shutdown: CancellationToken, + /// Delayed diagnostics reporting in `GetProviderSchema` for better UX. + state: Result>, +} + +struct ProviderState { + data_sources: HashMap>, +} + +impl ProviderHandler { + /// Creates a new `ProviderHandler`. + /// This function is infallible, as it is not called during a time where reporting errors nicely is possible. + /// If there's an error, we just taint our internal state and report errors in `GetProviderSchema`. + pub fn new(shutdown: CancellationToken, provider: &dyn Provider) -> Self { + let name = provider.name(); + let mut data_sources = HashMap::new(); + let mut errors = vec![]; + + for ds in provider.data_sources() { + let ds_name = ds.name(&name); + let entry = data_sources.insert(ds_name.clone(), ds); + if entry.is_some() { + errors.push(tfplugin6::Diagnostic { + severity: tfplugin6::diagnostic::Severity::Error as _, + summary: format!("data source {ds_name} exists more than once"), + detail: "".to_owned(), + attribute: None, + }); + } + } + + let state = if errors.len() > 0 { + Err(errors) + } else { + Ok(ProviderState { data_sources }) + }; + + Self { shutdown, state } + } + + pub(super) fn get_schemas(&self) -> Schemas { + let resources = HashMap::new(); + let state = match &self.state { + Ok(state) => state, + Err(errors) => { + return Schemas { + resources: HashMap::new(), + data_sources: HashMap::new(), + diagnostics: errors.clone(), + } + } + }; + let data_sources = state + .data_sources + .iter() + .map(|(name, ds)| (name.to_owned(), ds.schema().to_tfplugin())) + .collect::>(); + + Schemas { + resources, + data_sources, + diagnostics: vec![], + } + } + + pub(super) async fn do_read_data_source( + &self, + type_name: &str, + config: &Option, + ) -> (Option, Vec) { + let ds = self + .state + .as_ref() + .unwrap() + .data_sources + .get(type_name) + .unwrap(); + + let typ = ds.schema().typ(); + let config = match config { + None => crate::values::Value::Null, + Some(v) => { + let value = crate::values::Value::msg_unpack(&v.msgpack, &typ); + match value { + Ok(value) => value, + Err(errs) => { + return (None, errs.to_tfplugin_diags()); + } + } + } + }; + + let state = ds.read(config).await; + let (state, diagnostics) = match state { + Ok(s) => ( + Some(tfplugin6::DynamicValue { + msgpack: s.msg_pack(), + json: vec![], + }), + vec![], + ), + Err(errs) => (None, errs.to_tfplugin_diags()), + }; + + (state, diagnostics) + } +} diff --git a/terustform/src/server/mod.rs b/terustform/src/server/mod.rs index af32af6..83506c3 100644 --- a/terustform/src/server/mod.rs +++ b/terustform/src/server/mod.rs @@ -1,6 +1,7 @@ mod cert; mod convert; mod grpc; +mod handler; use std::collections::HashMap; use std::env; @@ -9,11 +10,9 @@ use std::path::PathBuf; use base64::Engine; use eyre::{bail, Context}; use tokio::net::UnixListener; -use tokio_util::sync::CancellationToken; use tonic::transport::{Certificate, ServerTlsConfig}; use tracing::info; -use crate::datasource::DataSource; use crate::provider::Provider; pub use grpc::plugin::grpc_controller_server::GrpcControllerServer; @@ -21,73 +20,7 @@ pub use grpc::tfplugin6::provider_server::ProviderServer; pub use grpc::Controller; use self::grpc::tfplugin6; - -pub struct ProviderHandler { - shutdown: CancellationToken, - /// Delayed diagnostics reporting in `GetProviderSchema` for better UX. - state: Result>, -} - -struct ProviderState { - data_sources: HashMap>, -} - -impl ProviderHandler { - /// Creates a new `ProviderHandler`. - /// This function is infallible, as it is not called during a time where reporting errors nicely is possible. - /// If there's an error, we just taint our internal state and report errors in `GetProviderSchema`. - pub fn new(shutdown: CancellationToken, provider: &dyn Provider) -> Self { - let name = provider.name(); - let mut data_sources = HashMap::new(); - let mut errors = vec![]; - - for ds in provider.data_sources() { - let ds_name = ds.name(&name); - let entry = data_sources.insert(ds_name.clone(), ds); - if entry.is_some() { - errors.push(tfplugin6::Diagnostic { - severity: tfplugin6::diagnostic::Severity::Error as _, - summary: format!("data source {ds_name} exists more than once"), - detail: "".to_owned(), - attribute: None, - }); - } - } - - let state = if errors.len() > 0 { - Err(errors) - } else { - Ok(ProviderState { data_sources }) - }; - - Self { shutdown, state } - } - - fn get_schemas(&self) -> Schemas { - let resources = HashMap::new(); - let state = match &self.state { - Ok(state) => state, - Err(errors) => { - return Schemas { - resources: HashMap::new(), - data_sources: HashMap::new(), - diagnostics: errors.clone(), - } - } - }; - let data_sources = state - .data_sources - .iter() - .map(|(name, ds)| (name.to_owned(), ds.schema().to_tfplugin())) - .collect::>(); - - Schemas { - resources, - data_sources, - diagnostics: vec![], - } - } -} +use self::handler::ProviderHandler; struct Schemas { resources: HashMap, @@ -125,7 +58,7 @@ pub async fn serve(provider: &dyn Provider) -> eyre::Result<()> { let server = tonic::transport::Server::builder() .tls_config(tls) .wrap_err("invalid TLS config")? - .add_service(ProviderServer::new(ProviderHandler::new( + .add_service(ProviderServer::new(handler::ProviderHandler::new( shutdown.clone(), provider, )))