more movement

This commit is contained in:
nora 2024-04-15 20:41:01 +02:00
parent 991642b90d
commit dc76e01e92
8 changed files with 132 additions and 111 deletions

1
Cargo.lock generated
View file

@ -1090,6 +1090,7 @@ dependencies = [
name = "terustform" name = "terustform"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait",
"base64 0.22.0", "base64 0.22.0",
"eyre", "eyre",
"prost", "prost",

View file

@ -32,6 +32,7 @@ struct ExampleDataSourceModel {
id: StringValue, id: StringValue,
} }
#[terustform::async_trait]
impl DataSource for ExampleDataSource { impl DataSource for ExampleDataSource {
fn name(&self, provider_name: &str) -> String { fn name(&self, provider_name: &str) -> String {
format!("{provider_name}_kitty") format!("{provider_name}_kitty")
@ -69,7 +70,7 @@ impl DataSource for ExampleDataSource {
} }
} }
fn read(&self, config: Value) -> DResult<Value> { async fn read(&self, config: Value) -> DResult<Value> {
let mut model = ExampleDataSourceModel::from_value(config, &AttrPath::root())?; let mut model = ExampleDataSourceModel::from_value(config, &AttrPath::root())?;
let name_str = model.name.expect_known(AttrPath::attr("name"))?; let name_str = model.name.expect_known(AttrPath::attr("name"))?;

View file

@ -22,6 +22,7 @@ tokio-util = "0.7.10"
tonic = { version = "0.11.0", features = ["tls"] } tonic = { version = "0.11.0", features = ["tls"] }
tracing = "0.1.40" tracing = "0.1.40"
tracing-subscriber = "0.3.18" tracing-subscriber = "0.3.18"
async-trait = "0.1.80"
[build-dependencies] [build-dependencies]
tonic-build = "0.11.0" tonic-build = "0.11.0"

View file

@ -4,11 +4,12 @@ use crate::values::{Type, Value};
use super::DResult; use super::DResult;
#[crate::async_trait]
pub trait DataSource: Send + Sync { pub trait DataSource: Send + Sync {
fn name(&self, provider_name: &str) -> String; fn name(&self, provider_name: &str) -> String;
fn schema(&self) -> Schema; fn schema(&self) -> Schema;
// todo: probably want some kind of Value+Schema thing like tfsdk? whatever. // todo: probably want some kind of Value+Schema thing like tfsdk? whatever.
fn read(&self, config: Value) -> DResult<Value>; async fn read(&self, config: Value) -> DResult<Value>;
fn erase(self) -> Box<dyn DataSource> fn erase(self) -> Box<dyn DataSource>
where where

View file

@ -6,11 +6,14 @@ pub mod datasource;
pub mod provider; pub mod provider;
pub use diag::*; pub use diag::*;
pub use terustform_macros::Model;
pub use values::*; pub use values::*;
use provider::Provider; pub use terustform_macros::Model;
pub use async_trait::async_trait;
pub use eyre;
use provider::Provider;
use tracing::Level; use tracing::Level;
pub async fn start(provider: &dyn Provider) -> eyre::Result<()> { pub async fn start(provider: &dyn Provider) -> eyre::Result<()> {

View file

@ -228,44 +228,9 @@ impl Provider for super::ProviderHandler {
request: Request<tfplugin6::read_data_source::Request>, request: Request<tfplugin6::read_data_source::Request>,
) -> Result<Response<tfplugin6::read_data_source::Response>, Status> { ) -> Result<Response<tfplugin6::read_data_source::Response>, Status> {
tracing::info!("read_data_source"); tracing::info!("read_data_source");
let req = request.get_ref();
let ds = self let (state, diagnostics) = self.do_read_data_source(&req.type_name, &req.config).await;
.state
.as_ref()
.unwrap()
.data_sources
.get(&request.get_ref().type_name)
.unwrap();
let typ = ds.schema().typ();
let config = match &request.get_ref().config {
None => crate::values::Value::Null,
Some(v) => {
let value = crate::values::Value::msg_unpack(&v.msgpack, &typ);
match value {
Ok(value) => value,
Err(errs) => {
return Ok(Response::new(tfplugin6::read_data_source::Response {
deferred: None,
state: None,
diagnostics: errs.to_tfplugin_diags(),
}));
}
}
}
};
let state = ds.read(config);
let (state, diagnostics) = match state {
Ok(s) => (
Some(tfplugin6::DynamicValue {
msgpack: s.msg_pack(),
json: vec![],
}),
vec![],
),
Err(errs) => (None, errs.to_tfplugin_diags()),
};
let reply = tfplugin6::read_data_source::Response { let reply = tfplugin6::read_data_source::Response {
state, state,

View file

@ -0,0 +1,116 @@
use std::collections::HashMap;
use tokio_util::sync::CancellationToken;
use crate::{datasource::DataSource, provider::Provider};
use super::{grpc::tfplugin6, Schemas};
pub struct ProviderHandler {
pub(super) shutdown: CancellationToken,
/// Delayed diagnostics reporting in `GetProviderSchema` for better UX.
state: Result<ProviderState, Vec<tfplugin6::Diagnostic>>,
}
struct ProviderState {
data_sources: HashMap<String, Box<dyn DataSource>>,
}
impl ProviderHandler {
/// Creates a new `ProviderHandler`.
/// This function is infallible, as it is not called during a time where reporting errors nicely is possible.
/// If there's an error, we just taint our internal state and report errors in `GetProviderSchema`.
pub fn new(shutdown: CancellationToken, provider: &dyn Provider) -> Self {
let name = provider.name();
let mut data_sources = HashMap::new();
let mut errors = vec![];
for ds in provider.data_sources() {
let ds_name = ds.name(&name);
let entry = data_sources.insert(ds_name.clone(), ds);
if entry.is_some() {
errors.push(tfplugin6::Diagnostic {
severity: tfplugin6::diagnostic::Severity::Error as _,
summary: format!("data source {ds_name} exists more than once"),
detail: "".to_owned(),
attribute: None,
});
}
}
let state = if errors.len() > 0 {
Err(errors)
} else {
Ok(ProviderState { data_sources })
};
Self { shutdown, state }
}
pub(super) fn get_schemas(&self) -> Schemas {
let resources = HashMap::new();
let state = match &self.state {
Ok(state) => state,
Err(errors) => {
return Schemas {
resources: HashMap::new(),
data_sources: HashMap::new(),
diagnostics: errors.clone(),
}
}
};
let data_sources = state
.data_sources
.iter()
.map(|(name, ds)| (name.to_owned(), ds.schema().to_tfplugin()))
.collect::<HashMap<String, tfplugin6::Schema>>();
Schemas {
resources,
data_sources,
diagnostics: vec![],
}
}
pub(super) async fn do_read_data_source(
&self,
type_name: &str,
config: &Option<tfplugin6::DynamicValue>,
) -> (Option<tfplugin6::DynamicValue>, Vec<tfplugin6::Diagnostic>) {
let ds = self
.state
.as_ref()
.unwrap()
.data_sources
.get(type_name)
.unwrap();
let typ = ds.schema().typ();
let config = match config {
None => crate::values::Value::Null,
Some(v) => {
let value = crate::values::Value::msg_unpack(&v.msgpack, &typ);
match value {
Ok(value) => value,
Err(errs) => {
return (None, errs.to_tfplugin_diags());
}
}
}
};
let state = ds.read(config).await;
let (state, diagnostics) = match state {
Ok(s) => (
Some(tfplugin6::DynamicValue {
msgpack: s.msg_pack(),
json: vec![],
}),
vec![],
),
Err(errs) => (None, errs.to_tfplugin_diags()),
};
(state, diagnostics)
}
}

View file

@ -1,6 +1,7 @@
mod cert; mod cert;
mod convert; mod convert;
mod grpc; mod grpc;
mod handler;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
@ -9,11 +10,9 @@ use std::path::PathBuf;
use base64::Engine; use base64::Engine;
use eyre::{bail, Context}; use eyre::{bail, Context};
use tokio::net::UnixListener; use tokio::net::UnixListener;
use tokio_util::sync::CancellationToken;
use tonic::transport::{Certificate, ServerTlsConfig}; use tonic::transport::{Certificate, ServerTlsConfig};
use tracing::info; use tracing::info;
use crate::datasource::DataSource;
use crate::provider::Provider; use crate::provider::Provider;
pub use grpc::plugin::grpc_controller_server::GrpcControllerServer; pub use grpc::plugin::grpc_controller_server::GrpcControllerServer;
@ -21,73 +20,7 @@ pub use grpc::tfplugin6::provider_server::ProviderServer;
pub use grpc::Controller; pub use grpc::Controller;
use self::grpc::tfplugin6; use self::grpc::tfplugin6;
use self::handler::ProviderHandler;
pub struct ProviderHandler {
shutdown: CancellationToken,
/// Delayed diagnostics reporting in `GetProviderSchema` for better UX.
state: Result<ProviderState, Vec<tfplugin6::Diagnostic>>,
}
struct ProviderState {
data_sources: HashMap<String, Box<dyn DataSource>>,
}
impl ProviderHandler {
/// Creates a new `ProviderHandler`.
/// This function is infallible, as it is not called during a time where reporting errors nicely is possible.
/// If there's an error, we just taint our internal state and report errors in `GetProviderSchema`.
pub fn new(shutdown: CancellationToken, provider: &dyn Provider) -> Self {
let name = provider.name();
let mut data_sources = HashMap::new();
let mut errors = vec![];
for ds in provider.data_sources() {
let ds_name = ds.name(&name);
let entry = data_sources.insert(ds_name.clone(), ds);
if entry.is_some() {
errors.push(tfplugin6::Diagnostic {
severity: tfplugin6::diagnostic::Severity::Error as _,
summary: format!("data source {ds_name} exists more than once"),
detail: "".to_owned(),
attribute: None,
});
}
}
let state = if errors.len() > 0 {
Err(errors)
} else {
Ok(ProviderState { data_sources })
};
Self { shutdown, state }
}
fn get_schemas(&self) -> Schemas {
let resources = HashMap::new();
let state = match &self.state {
Ok(state) => state,
Err(errors) => {
return Schemas {
resources: HashMap::new(),
data_sources: HashMap::new(),
diagnostics: errors.clone(),
}
}
};
let data_sources = state
.data_sources
.iter()
.map(|(name, ds)| (name.to_owned(), ds.schema().to_tfplugin()))
.collect::<HashMap<String, tfplugin6::Schema>>();
Schemas {
resources,
data_sources,
diagnostics: vec![],
}
}
}
struct Schemas { struct Schemas {
resources: HashMap<String, tfplugin6::Schema>, resources: HashMap<String, tfplugin6::Schema>,
@ -125,7 +58,7 @@ pub async fn serve(provider: &dyn Provider) -> eyre::Result<()> {
let server = tonic::transport::Server::builder() let server = tonic::transport::Server::builder()
.tls_config(tls) .tls_config(tls)
.wrap_err("invalid TLS config")? .wrap_err("invalid TLS config")?
.add_service(ProviderServer::new(ProviderHandler::new( .add_service(ProviderServer::new(handler::ProviderHandler::new(
shutdown.clone(), shutdown.clone(),
provider, provider,
))) )))