Continue implementing resources

This commit is contained in:
nora 2024-04-29 21:24:03 +02:00
parent bf7bd330b3
commit f46d9b299d
13 changed files with 309 additions and 106 deletions

9
Cargo.lock generated
View file

@ -1199,18 +1199,18 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.197" version = "1.0.199"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" checksum = "0c9f6e76df036c77cd94996771fb40db98187f096dd0b9af39c6c6e452ba966a"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.197" version = "1.0.199"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" checksum = "11bd257a6541e141e42ca6d24ae26f7714887b47e89aa739099104c7e4d3b7fc"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1331,6 +1331,7 @@ dependencies = [
"dto", "dto",
"eyre", "eyre",
"reqwest", "reqwest",
"serde",
"terustform", "terustform",
"tokio", "tokio",
] ]

View file

@ -11,3 +11,4 @@ terustform = { path = "../terustform" }
tokio = { version = "1.37.0", features = ["full"] } tokio = { version = "1.37.0", features = ["full"] }
dto = { git = "https://github.com/nilstrieb-lehre/davinci-cors.git", rev = "bef75a802cf48cf63d171136c2cea67b83055387" } dto = { git = "https://github.com/nilstrieb-lehre/davinci-cors.git", rev = "bef75a802cf48cf63d171136c2cea67b83055387" }
serde = "1.0.199"

View file

@ -30,10 +30,7 @@ impl CorsClient {
.wrap_err("Token is invalid utf8")?; .wrap_err("Token is invalid utf8")?;
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
headers.insert( headers.insert("Authorization", HeaderValue::from_str(token).unwrap());
"Authorization",
HeaderValue::from_str(token).unwrap(),
);
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.default_headers(headers) .default_headers(headers)
.build() .build()
@ -46,24 +43,37 @@ impl CorsClient {
Ok(do_request(self.client.get(format!("{URL}/hugo"))) Ok(do_request(self.client.get(format!("{URL}/hugo")))
.await? .await?
.text() .text()
.await?) .await
.wrap_err("failed to get hugo")?)
} }
pub async fn get_class(&self, id: &str) -> Result<dto::Class> { pub async fn get_class(&self, id: &str) -> Result<dto::Class> {
Ok(do_request(self.client.get(format!("{URL}/classes/{id}"))) Ok(
.await? do_request_body(self.client.get(format!("{URL}/classes/{id}")))
.json() .await
.await?) .wrap_err("failed to get class")?,
)
}
pub async fn post_class(&self, class: &dto::Class) -> Result<dto::Class> {
Ok(
do_request_body(self.client.post(format!("{URL}/classes")).json(class))
.await
.wrap_err("creating class")?,
)
} }
} }
async fn do_request_body<T: serde::de::DeserializeOwned>(req: RequestBuilder) -> Result<T> {
Ok(do_request(req).await?.json().await?)
}
async fn do_request(req: RequestBuilder) -> Result<Response> { async fn do_request(req: RequestBuilder) -> Result<Response> {
dbg!(&req);
let res = req.send().await?; let res = req.send().await?;
if let Err(err) = res.error_for_status_ref() { if let Err(err) = res.error_for_status_ref() {
let text = res.text().await.unwrap_or_default(); let text = res.text().await.unwrap_or_default();
return Err(err).wrap_err(text); return Err(err).wrap_err(text);
} }
res.error_for_status().wrap_err("failed to get class") res.error_for_status().wrap_err("failed make request")
} }

View file

@ -13,18 +13,18 @@ pub struct ClassDataSource {
} }
#[derive(terustform::Model)] #[derive(terustform::Model)]
struct ClassDataSourceModel { pub(super) struct ClassModel {
id: StringValue, pub(super) id: StringValue,
name: StringValue, pub(super) name: StringValue,
description: StringValue, pub(super) description: StringValue,
discord_id: StringValue, pub(super) discord_id: StringValue,
} }
impl DataSource for ClassDataSource { impl DataSource for ClassDataSource {
type ProviderData = CorsClient; type ProviderData = CorsClient;
async fn read(&self, config: Value) -> DResult<Value> { async fn read(&self, config: Value) -> DResult<Value> {
let mut model = ClassDataSourceModel::from_value(config, &AttrPath::root())?; let model = ClassModel::from_value(config, &AttrPath::root())?;
let class = self let class = self
.client .client
@ -33,11 +33,13 @@ impl DataSource for ClassDataSource {
.wrap_err("failed to get class") .wrap_err("failed to get class")
.eyre_to_tf()?; .eyre_to_tf()?;
model.name = StringValue::Known(class.name); Ok(ClassModel {
model.description = StringValue::Known(class.description); id: model.id,
model.discord_id = StringValue::from(class.discord_id); name: class.name.into(),
description: class.description.into(),
Ok(model.to_value()) discord_id: class.discord_id.into(),
}
.to_value())
} }
fn name(provider_name: &str) -> String { fn name(provider_name: &str) -> String {

View file

@ -1,9 +1,13 @@
use std::collections::HashMap; use std::collections::HashMap;
use terustform::{resource::Resource, Attribute, DResult, Mode, Schema, Value}; use terustform::{
resource::Resource, AttrPath, Attribute, DResult, EyreExt, Mode, Schema, Value, ValueModel,
};
use crate::client::CorsClient; use crate::client::CorsClient;
use super::class_data_source::ClassModel;
pub struct ClassResource { pub struct ClassResource {
client: CorsClient, client: CorsClient,
} }
@ -11,19 +15,59 @@ pub struct ClassResource {
impl Resource for ClassResource { impl Resource for ClassResource {
type ProviderData = CorsClient; type ProviderData = CorsClient;
async fn read(&self, config: Value) -> DResult<Value> { async fn read(&self, current_state: Value) -> DResult<Value> {
let model = ClassModel::from_value(current_state, &AttrPath::root())?;
let class = self
.client
.get_class(model.id.expect_known(AttrPath::attr("id"))?)
.await
.eyre_to_tf()?;
Ok(ClassModel {
id: model.id,
name: class.name.into(),
description: class.description.into(),
discord_id: class.discord_id.into(),
}
.to_value())
}
async fn create(&self, _config: Value, plan: Value) -> DResult<Value> {
let model = ClassModel::from_root_value(plan)?;
let class = self
.client
.post_class(&dto::Class {
id: Default::default(),
members: vec![],
name: model.name.expect_known(AttrPath::attr("name"))?.clone(),
description: model
.description
.expect_known(AttrPath::attr("description"))?
.clone(),
discord_id: model
.discord_id
.expect_known_or_null(AttrPath::attr("discord_id"))?
.cloned(),
})
.await
.eyre_to_tf()?;
Ok(ClassModel {
id: class.id.to_string().into(),
name: class.name.into(),
description: class.description.into(),
discord_id: class.discord_id.into(),
}
.to_value())
}
async fn update(&self, _config: Value, _plan: Value, _state: Value) -> DResult<Value> {
todo!() todo!()
} }
async fn create(&self, config: Value) -> DResult<Value> { async fn delete(&self, _state: Value) -> DResult<Value> {
todo!()
}
async fn update(&self, config: Value) -> DResult<Value> {
todo!()
}
async fn delete(&self, state: Value) -> DResult<Value> {
todo!() todo!()
} }
@ -56,7 +100,7 @@ impl Resource for ClassResource {
"description".to_owned(), "description".to_owned(),
Attribute::String { Attribute::String {
description: "The description".to_owned(), description: "The description".to_owned(),
mode: Mode::Optional, mode: Mode::Required,
sensitive: false, sensitive: false,
}, },
), ),

View file

@ -22,4 +22,5 @@ output "class" {
resource "corsschool_class" "myclass" { resource "corsschool_class" "myclass" {
name = "meow" name = "meow"
description = "???"
} }

View file

@ -26,13 +26,11 @@ use tracing_subscriber::EnvFilter;
// Rest of the file. // Rest of the file.
use provider::Provider; use provider::Provider;
use tracing::Level;
pub async fn start<P: Provider>(provider: P) -> eyre::Result<()> { pub async fn start<P: Provider>(provider: P) -> eyre::Result<()> {
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.with_env_filter(EnvFilter::builder().parse_lossy( .with_env_filter(EnvFilter::builder().parse_lossy(
std::env::var("RUST_LOG").unwrap_or_else(|_| "h2=info,rustls=info,debug".into()), std::env::var("RUST_LOG").unwrap_or_else(|_| "h2=info,rustls=info,hyper_util=info,debug".into()),
)) ))
.with_writer(std::io::stderr) .with_writer(std::io::stderr)
.without_time() .without_time()

View file

@ -62,14 +62,14 @@ pub struct MkResource<D: ProviderData> {
} }
pub(crate) struct StoredResource { pub(crate) struct StoredResource {
pub(crate) ds: Arc<dyn DynResource>, pub(crate) rs: Arc<dyn DynResource>,
pub(crate) schema: Schema, pub(crate) schema: Schema,
} }
impl Clone for StoredResource { impl Clone for StoredResource {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
ds: self.ds.clone(), rs: self.rs.clone(),
schema: self.schema.clone(), schema: self.schema.clone(),
} }
} }
@ -82,7 +82,7 @@ impl<D: ProviderData> MkResource<D> {
schema: Rs::schema(), schema: Rs::schema(),
mk: |data| { mk: |data| {
Ok(StoredResource { Ok(StoredResource {
ds: Arc::new(Rs::new(data)?), rs: Arc::new(Rs::new(data)?),
schema: Rs::schema(), schema: Rs::schema(),
}) })
}, },

View file

@ -12,9 +12,18 @@ pub trait Resource: Sized + Send + Sync + 'static {
type ProviderData: ProviderData; type ProviderData: ProviderData;
// todo: probably want some kind of Value+Schema thing like tfsdk? whatever. // todo: probably want some kind of Value+Schema thing like tfsdk? whatever.
fn read(&self, config: Value) -> impl Future<Output = DResult<Value>> + Send + Sync; fn read(&self, current_state: Value) -> impl Future<Output = DResult<Value>> + Send + Sync;
fn create(&self, config: Value) -> impl Future<Output = DResult<Value>> + Send + Sync; fn create(
fn update(&self, config: Value) -> impl Future<Output = DResult<Value>> + Send + Sync; &self,
config: Value,
plan: Value,
) -> impl Future<Output = DResult<Value>> + Send + Sync;
fn update(
&self,
config: Value,
plan: Value,
state: Value,
) -> impl Future<Output = DResult<Value>> + Send + Sync;
fn delete(&self, state: Value) -> impl Future<Output = DResult<Value>> + Send + Sync; fn delete(&self, state: Value) -> impl Future<Output = DResult<Value>> + Send + Sync;
fn name(provider_name: &str) -> String; fn name(provider_name: &str) -> String;
@ -27,23 +36,23 @@ pub trait Resource: Sized + Send + Sync + 'static {
} }
pub(crate) trait DynResource: Send + Sync + 'static { pub(crate) trait DynResource: Send + Sync + 'static {
fn read(&self, config: Value) -> BoxFut<'_, DResult<Value>>; fn read(&self, current_state: Value) -> BoxFut<'_, DResult<Value>>;
fn create(&self, config: Value) -> BoxFut<'_, DResult<Value>>; fn create(&self, config: Value, plan: Value) -> BoxFut<'_, DResult<Value>>;
fn update(&self, config: Value) -> BoxFut<'_, DResult<Value>>; fn update(&self, config: Value, plan: Value, state: Value) -> BoxFut<'_, DResult<Value>>;
fn delete(&self, config: Value) -> BoxFut<'_, DResult<Value>>; fn delete(&self, state: Value) -> BoxFut<'_, DResult<Value>>;
} }
impl<R: Resource> DynResource for R { impl<R: Resource> DynResource for R {
fn read(&self, config: Value) -> BoxFut<'_, DResult<Value>> { fn read(&self, current_state: Value) -> BoxFut<'_, DResult<Value>> {
Box::pin(Resource::read(self, config)) Box::pin(Resource::read(self, current_state))
} }
fn create(&self, config: Value) -> BoxFut<'_, DResult<Value>> { fn create(&self, config: Value, plan: Value) -> BoxFut<'_, DResult<Value>> {
Box::pin(Resource::create(self, config)) Box::pin(Resource::create(self, config, plan))
} }
fn update(&self, config: Value) -> BoxFut<'_, DResult<Value>> { fn update(&self, config: Value, plan: Value, state: Value) -> BoxFut<'_, DResult<Value>> {
Box::pin(Resource::update(self, config)) Box::pin(Resource::update(self, config, plan, state))
} }
fn delete(&self, state: Value) -> BoxFut<'_, DResult<Value>> { fn delete(&self, state: Value) -> BoxFut<'_, DResult<Value>> {
Box::pin(Resource::create(self, state)) Box::pin(Resource::delete(self, state))
} }
} }

View file

@ -1,4 +1,4 @@
use crate::{values::Type, AttrPathSegment, Attribute, Diagnostics, Mode, Schema}; use crate::{values::Type, AttrPathSegment, Attribute, Diagnostics, Mode, Schema, Value};
use super::grpc::tfplugin6; use super::grpc::tfplugin6;
@ -107,3 +107,12 @@ impl Diagnostics {
.collect() .collect()
} }
} }
impl Value {
pub(crate) fn into_tfplugin(self) -> Option<tfplugin6::DynamicValue> {
Some(tfplugin6::DynamicValue {
msgpack: self.msg_pack(),
json: vec![],
})
}
}

View file

@ -43,6 +43,7 @@ impl<P: crate::provider::Provider> Provider for super::ProviderHandler<P> {
/// where clients may receive an unimplemented RPC error. Clients should /// where clients may receive an unimplemented RPC error. Clients should
/// ignore the error and call the GetProviderSchema RPC as a fallback. /// ignore the error and call the GetProviderSchema RPC as a fallback.
/// Returns data source, managed resource, and function metadata, such as names. /// Returns data source, managed resource, and function metadata, such as names.
#[tracing::instrument(skip(self, request))]
async fn get_metadata( async fn get_metadata(
&self, &self,
request: Request<tfplugin6::get_metadata::Request>, request: Request<tfplugin6::get_metadata::Request>,
@ -55,13 +56,14 @@ impl<P: crate::provider::Provider> Provider for super::ProviderHandler<P> {
/// GetSchema returns schema information for the provider, data resources, /// GetSchema returns schema information for the provider, data resources,
/// and managed resources. /// and managed resources.
/// Returns provider schema, provider metaschema, all resource schemas and all data source schemas. /// Returns provider schema, provider metaschema, all resource schemas and all data source schemas.
#[tracing::instrument(skip(self, request))]
async fn get_provider_schema( async fn get_provider_schema(
&self, &self,
request: Request<tfplugin6::get_provider_schema::Request>, request: Request<tfplugin6::get_provider_schema::Request>,
) -> Result<Response<tfplugin6::get_provider_schema::Response>, Status> { ) -> Result<Response<tfplugin6::get_provider_schema::Response>, Status> {
info!("Received get_provider_schema"); info!("get_provider_schema");
let schemas = self.get_schemas().await; let schemas = self.do_get_provider_schema().await;
let reply = tfplugin6::get_provider_schema::Response { let reply = tfplugin6::get_provider_schema::Response {
provider: Some(empty_schema()), provider: Some(empty_schema()),
@ -81,6 +83,7 @@ impl<P: crate::provider::Provider> Provider for super::ProviderHandler<P> {
} }
/// Validates the practitioner supplied provider configuration by verifying types conform to the schema and supports value validation diagnostics. /// Validates the practitioner supplied provider configuration by verifying types conform to the schema and supports value validation diagnostics.
#[tracing::instrument(skip(self, request))]
async fn validate_provider_config( async fn validate_provider_config(
&self, &self,
request: Request<tfplugin6::validate_provider_config::Request>, request: Request<tfplugin6::validate_provider_config::Request>,
@ -95,11 +98,14 @@ impl<P: crate::provider::Provider> Provider for super::ProviderHandler<P> {
} }
/// Validates the practitioner supplied resource configuration by verifying types conform to the schema and supports value validation diagnostics. /// Validates the practitioner supplied resource configuration by verifying types conform to the schema and supports value validation diagnostics.
#[tracing::instrument(skip(self, request), fields(name = request.get_ref().type_name))]
async fn validate_resource_config( async fn validate_resource_config(
&self, &self,
request: Request<tfplugin6::validate_resource_config::Request>, request: Request<tfplugin6::validate_resource_config::Request>,
) -> Result<Response<tfplugin6::validate_resource_config::Response>, Status> { ) -> Result<Response<tfplugin6::validate_resource_config::Response>, Status> {
tracing::info!("validate_resource_config"); tracing::info!(name=?request.get_ref().type_name, "validate_resource_config");
// No validators for now.
let reply = tfplugin6::validate_resource_config::Response { let reply = tfplugin6::validate_resource_config::Response {
diagnostics: vec![], diagnostics: vec![],
@ -109,11 +115,14 @@ impl<P: crate::provider::Provider> Provider for super::ProviderHandler<P> {
} }
/// Validates the practitioner supplied data source configuration by verifying types conform to the schema and supports value validation diagnostics. /// Validates the practitioner supplied data source configuration by verifying types conform to the schema and supports value validation diagnostics.
#[tracing::instrument(skip(self, request), fields(name = request.get_ref().type_name))]
async fn validate_data_resource_config( async fn validate_data_resource_config(
&self, &self,
request: Request<tfplugin6::validate_data_resource_config::Request>, request: Request<tfplugin6::validate_data_resource_config::Request>,
) -> Result<Response<tfplugin6::validate_data_resource_config::Response>, Status> { ) -> Result<Response<tfplugin6::validate_data_resource_config::Response>, Status> {
tracing::info!("validate_data_resource_config"); tracing::info!(name=?request.get_ref().type_name, "validate_data_resource_config");
// No validators for now.
let reply = tfplugin6::validate_data_resource_config::Response { let reply = tfplugin6::validate_data_resource_config::Response {
diagnostics: vec![], diagnostics: vec![],
@ -124,11 +133,12 @@ impl<P: crate::provider::Provider> Provider for super::ProviderHandler<P> {
/// Called when a resource has existing state. Primarily useful for when the schema version does not match the current version. /// Called when a resource has existing state. Primarily useful for when the schema version does not match the current version.
/// The provider is expected to modify the state to upgrade it to the latest schema. /// The provider is expected to modify the state to upgrade it to the latest schema.
#[tracing::instrument(skip(self, request), fields(name = request.get_ref().type_name))]
async fn upgrade_resource_state( async fn upgrade_resource_state(
&self, &self,
request: Request<tfplugin6::upgrade_resource_state::Request>, request: Request<tfplugin6::upgrade_resource_state::Request>,
) -> Result<Response<tfplugin6::upgrade_resource_state::Response>, Status> { ) -> Result<Response<tfplugin6::upgrade_resource_state::Response>, Status> {
tracing::info!("upgrade_resource_state"); tracing::info!(name=?request.get_ref().type_name, "upgrade_resource_state");
// We don't do anything interesting, it's fine. // We don't do anything interesting, it's fine.
let reply = tfplugin6::upgrade_resource_state::Response { let reply = tfplugin6::upgrade_resource_state::Response {
upgraded_state: None, upgraded_state: None,
@ -139,27 +149,33 @@ impl<P: crate::provider::Provider> Provider for super::ProviderHandler<P> {
} }
/// ////// One-time initialization, called before other functions below /// ////// One-time initialization, called before other functions below
/// Passes the practitioner supplied provider configuration to the provider. /// Passes the practitioner supplied provider configuration to the provider.
#[tracing::instrument(skip(self, request))]
async fn configure_provider( async fn configure_provider(
&self, &self,
request: Request<tfplugin6::configure_provider::Request>, request: Request<tfplugin6::configure_provider::Request>,
) -> Result<Response<tfplugin6::configure_provider::Response>, Status> { ) -> Result<Response<tfplugin6::configure_provider::Response>, Status> {
tracing::info!("configure_provider"); tracing::info!("configure_provider");
let diagnostics = self.do_configure_provider(&request.get_ref().config).await; let (_, diagnostics) = self.do_configure_provider(&request.get_ref().config).await;
let reply = tfplugin6::configure_provider::Response { diagnostics }; let reply = tfplugin6::configure_provider::Response { diagnostics };
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
/// ////// Managed Resource Lifecycle /// ////// Managed Resource Lifecycle
/// Called when refreshing a resource's state. /// Called when refreshing a resource's state.
#[tracing::instrument(skip(self, request), fields(name = request.get_ref().type_name))]
async fn read_resource( async fn read_resource(
&self, &self,
request: Request<tfplugin6::read_resource::Request>, request: Request<tfplugin6::read_resource::Request>,
) -> Result<Response<tfplugin6::read_resource::Response>, Status> { ) -> Result<Response<tfplugin6::read_resource::Response>, Status> {
tracing::info!("read_resource"); let req = request.get_ref();
let (new_state, diagnostics) = self
.do_read_resource(&req.type_name, &req.current_state)
.await;
let reply = tfplugin6::read_resource::Response { let reply = tfplugin6::read_resource::Response {
deferred: None, deferred: None,
diagnostics: vec![], diagnostics: vec![],
new_state: request.into_inner().current_state, new_state,
private: vec![], private: vec![],
}; };
@ -167,11 +183,14 @@ impl<P: crate::provider::Provider> Provider for super::ProviderHandler<P> {
} }
/// Calculates a plan for a resource. A proposed new state is generated, which the provider can modify. /// Calculates a plan for a resource. A proposed new state is generated, which the provider can modify.
#[tracing::instrument(skip(self, request), fields(name = request.get_ref().type_name))]
async fn plan_resource_change( async fn plan_resource_change(
&self, &self,
request: Request<tfplugin6::plan_resource_change::Request>, request: Request<tfplugin6::plan_resource_change::Request>,
) -> Result<Response<tfplugin6::plan_resource_change::Response>, Status> { ) -> Result<Response<tfplugin6::plan_resource_change::Response>, Status> {
tracing::info!("plan_resource_change"); tracing::info!(name=?request.get_ref().type_name, "plan_resource_change");
// We don't do anything interesting like requires_replace for now.
let reply = tfplugin6::plan_resource_change::Response { let reply = tfplugin6::plan_resource_change::Response {
planned_state: request.into_inner().proposed_new_state, planned_state: request.into_inner().proposed_new_state,
@ -187,16 +206,27 @@ impl<P: crate::provider::Provider> Provider for super::ProviderHandler<P> {
/// Called when a practitioner has approved a planned change. /// Called when a practitioner has approved a planned change.
/// The provider is to apply the changes contained in the plan, and return a resulting state matching the given plan. /// The provider is to apply the changes contained in the plan, and return a resulting state matching the given plan.
#[tracing::instrument(skip(self, request), fields(name = request.get_ref().type_name))]
async fn apply_resource_change( async fn apply_resource_change(
&self, &self,
request: Request<tfplugin6::apply_resource_change::Request>, request: Request<tfplugin6::apply_resource_change::Request>,
) -> Result<Response<tfplugin6::apply_resource_change::Response>, Status> { ) -> Result<Response<tfplugin6::apply_resource_change::Response>, Status> {
tracing::info!("apply_resource_change"); tracing::info!(name=?request.get_ref().type_name, "apply_resource_change");
let req = request.get_ref();
let (new_state, diagnostics) = self
.do_apply_resource_change(
&req.type_name,
&req.prior_state,
&req.planned_state,
&req.config,
)
.await;
let reply = tfplugin6::apply_resource_change::Response { let reply = tfplugin6::apply_resource_change::Response {
new_state: request.into_inner().planned_state, new_state,
private: vec![], private: vec![],
diagnostics: vec![], diagnostics,
legacy_type_system: false, legacy_type_system: false,
}; };
@ -204,29 +234,33 @@ impl<P: crate::provider::Provider> Provider for super::ProviderHandler<P> {
} }
/// Called when importing a resource into state so that the resource becomes managed. /// Called when importing a resource into state so that the resource becomes managed.
#[tracing::instrument(skip(self, request), fields(name = request.get_ref().type_name))]
async fn import_resource_state( async fn import_resource_state(
&self, &self,
request: Request<tfplugin6::import_resource_state::Request>, request: Request<tfplugin6::import_resource_state::Request>,
) -> Result<Response<tfplugin6::import_resource_state::Response>, Status> { ) -> Result<Response<tfplugin6::import_resource_state::Response>, Status> {
tracing::error!("import_resource_state"); tracing::error!(name=?request.get_ref().type_name, "import_resource_state");
todo!("import_resource_state") Err(Status::unimplemented("import_resource_state"))
} }
#[tracing::instrument(skip(self, request), fields(source_name = request.get_ref().source_type_name))]
async fn move_resource_state( async fn move_resource_state(
&self, &self,
request: Request<tfplugin6::move_resource_state::Request>, request: Request<tfplugin6::move_resource_state::Request>,
) -> Result<Response<tfplugin6::move_resource_state::Response>, Status> { ) -> Result<Response<tfplugin6::move_resource_state::Response>, Status> {
tracing::error!("move_resource_state"); tracing::error!(source_name=?request.get_ref().source_type_name, "move_resource_state");
todo!("move_resource_state") Err(Status::unimplemented("move_resource_state"))
} }
/// Called when refreshing a data source's state. /// Called when refreshing a data source's state.
#[tracing::instrument(skip(self, request), fields(name = request.get_ref().type_name))]
async fn read_data_source( async fn read_data_source(
&self, &self,
request: Request<tfplugin6::read_data_source::Request>, request: Request<tfplugin6::read_data_source::Request>,
) -> Result<Response<tfplugin6::read_data_source::Response>, Status> { ) -> Result<Response<tfplugin6::read_data_source::Response>, Status> {
tracing::info!("read_data_source"); tracing::info!(name=?request.get_ref().type_name, "read_data_source");
let req = request.get_ref(); let req = request.get_ref();
let (state, diagnostics) = self.do_read_data_source(&req.type_name, &req.config).await; let (state, diagnostics) = self.do_read_data_source(&req.type_name, &req.config).await;

View file

@ -2,6 +2,7 @@ use std::collections::HashMap;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use crate::{ use crate::{
provider::{MkDataSource, MkResource, Provider, StoredDataSource, StoredResource}, provider::{MkDataSource, MkResource, Provider, StoredDataSource, StoredResource},
@ -31,6 +32,8 @@ enum ProviderState<P: Provider> {
}, },
} }
const TF_OK: Vec<tfplugin6::Diagnostic> = vec![];
impl<P: Provider> ProviderHandler<P> { impl<P: Provider> ProviderHandler<P> {
/// Creates a new `ProviderHandler`. /// Creates a new `ProviderHandler`.
/// This function is infallible, as it is not called during a time where reporting errors nicely is possible. /// This function is infallible, as it is not called during a time where reporting errors nicely is possible.
@ -79,7 +82,7 @@ impl<P: Provider> ProviderHandler<P> {
pub(super) async fn do_configure_provider( pub(super) async fn do_configure_provider(
&self, &self,
config: &Option<tfplugin6::DynamicValue>, config: &Option<tfplugin6::DynamicValue>,
) -> Vec<tfplugin6::Diagnostic> { ) -> (Option<()>, Vec<tfplugin6::Diagnostic>) {
let mut state = self.state.lock().await; let mut state = self.state.lock().await;
let (provider, mk_ds, mk_rs) = match &*state { let (provider, mk_ds, mk_rs) = match &*state {
ProviderState::Setup { ProviderState::Setup {
@ -87,18 +90,12 @@ impl<P: Provider> ProviderHandler<P> {
mk_ds, mk_ds,
mk_rs, mk_rs,
} => (provider, mk_ds, mk_rs), } => (provider, mk_ds, mk_rs),
ProviderState::Failed { diags } => return diags.clone().into_tfplugin_diags(), ProviderState::Failed { diags } => return (None, diags.clone().into_tfplugin_diags()),
ProviderState::Configured { .. } => unreachable!("called configure twice"), ProviderState::Configured { .. } => unreachable!("called configure twice"),
}; };
let config = match parse_dynamic_value(config, &provider.schema().typ()) { let config = tf_try!(parse_dynamic_value(config, &provider.schema().typ()));
Ok(config) => config,
Err(errs) => return errs.into_tfplugin_diags(),
};
let data = match provider.configure(config).await { let data = tf_try!(provider.configure(config).await);
Ok(data) => data,
Err(errs) => return errs.into_tfplugin_diags(),
};
let mut diags = vec![]; let mut diags = vec![];
let mut data_sources = HashMap::new(); let mut data_sources = HashMap::new();
@ -130,10 +127,10 @@ impl<P: Provider> ProviderHandler<P> {
resources, resources,
}; };
diags (Some(()), diags)
} }
pub(super) async fn get_schemas(&self) -> Schemas { pub(super) async fn do_get_provider_schema(&self) -> Schemas {
let state = self.state.lock().await; let state = self.state.lock().await;
let (mk_ds, mk_rs) = match &*state { let (mk_ds, mk_rs) = match &*state {
@ -171,7 +168,7 @@ impl<P: Provider> ProviderHandler<P> {
Schemas { Schemas {
resources, resources,
data_sources, data_sources,
diagnostics: vec![], diagnostics: TF_OK,
} }
} }
@ -197,29 +194,106 @@ impl<P: Provider> ProviderHandler<P> {
}; };
let typ = ds.schema.typ(); let typ = ds.schema.typ();
let config = tf_try!(parse_dynamic_value(config, &typ));
let state = tf_try!(ds.ds.read(config).await);
let config = match parse_dynamic_value(config, &typ) { (state.into_tfplugin(), TF_OK)
}
pub(super) async fn do_read_resource(
&self,
type_name: &str,
current_state: &Option<tfplugin6::DynamicValue>,
) -> (Option<tfplugin6::DynamicValue>, Vec<tfplugin6::Diagnostic>) {
let rs: StoredResource = {
let state = self.state.lock().await;
match &*state {
ProviderState::Setup { .. } => {
unreachable!("must be set up before calling data sources")
}
ProviderState::Failed { diags } => {
return (None, diags.clone().into_tfplugin_diags())
}
ProviderState::Configured {
data_sources: _,
resources,
} => resources.get(type_name).unwrap().clone(),
}
};
let typ = rs.schema.typ();
let current_state = tf_try!(parse_dynamic_value(current_state, &typ));
if current_state.is_null() {
info!("reading from null state, skipping");
return (None, TF_OK);
}
let new_state = tf_try!(rs.rs.read(current_state).await);
(new_state.into_tfplugin(), TF_OK)
}
pub(super) async fn do_apply_resource_change(
&self,
type_name: &str,
prior_state: &Option<tfplugin6::DynamicValue>,
planned_state: &Option<tfplugin6::DynamicValue>,
config: &Option<tfplugin6::DynamicValue>,
) -> (Option<tfplugin6::DynamicValue>, Vec<tfplugin6::Diagnostic>) {
let rs: StoredResource = {
let state = self.state.lock().await;
match &*state {
ProviderState::Setup { .. } => {
unreachable!("must be set up before calling data sources")
}
ProviderState::Failed { diags } => {
return (None, diags.clone().into_tfplugin_diags())
}
ProviderState::Configured {
data_sources: _,
resources,
} => resources.get(type_name).unwrap().clone(),
}
};
let typ = rs.schema.typ();
let prior_state = tf_try!(parse_dynamic_value(prior_state, &typ));
let planned_state = tf_try!(parse_dynamic_value(planned_state, &typ));
let config = tf_try!(parse_dynamic_value(config, &typ));
debug!(
?prior_state,
?planned_state,
?config,
"Applying resource change"
);
let new_state = if prior_state.is_null() {
debug!("Change is create");
tf_try!(rs.rs.create(config, planned_state).await)
} else if planned_state.is_null() {
debug!("Change is delete");
tf_try!(rs.rs.delete(config).await);
Value::Null
} else {
debug!("Change is udpate");
tf_try!(rs.rs.update(config, planned_state, prior_state).await)
};
(new_state.into_tfplugin(), TF_OK)
}
}
macro_rules! tf_try {
($e:expr) => {
match $e {
Ok(value) => value, Ok(value) => value,
Err(errs) => { Err(errs) => {
return (None, errs.into_tfplugin_diags()); return (None, errs.into_tfplugin_diags());
} }
};
let state = ds.ds.read(config).await;
let (state, diagnostics) = match state {
Ok(s) => (
Some(tfplugin6::DynamicValue {
msgpack: s.msg_pack(),
json: vec![],
}),
vec![],
),
Err(errs) => (None, errs.into_tfplugin_diags()),
};
(state, diagnostics)
} }
};
} }
use tf_try;
fn parse_dynamic_value(value: &Option<tfplugin6::DynamicValue>, typ: &Type) -> DResult<Value> { fn parse_dynamic_value(value: &Option<tfplugin6::DynamicValue>, typ: &Type) -> DResult<Value> {
match value { match value {

View file

@ -119,6 +119,10 @@ pub enum BaseValue<T> {
} }
impl<T> BaseValue<T> { impl<T> BaseValue<T> {
pub fn is_null(&self) -> bool {
matches!(self, Self::Null)
}
fn map<U>(self, f: impl FnOnce(T) -> U) -> BaseValue<U> { fn map<U>(self, f: impl FnOnce(T) -> U) -> BaseValue<U> {
self.try_map(|v| Ok(f(v))).unwrap() self.try_map(|v| Ok(f(v))).unwrap()
} }
@ -144,6 +148,18 @@ impl<T> BaseValue<T> {
BaseValue::Known(v) => Ok(v), BaseValue::Known(v) => Ok(v),
} }
} }
pub fn expect_known_or_null(&self, path: AttrPath) -> DResult<Option<&T>> {
match self {
BaseValue::Null => Ok(None),
BaseValue::Unknown => Err(Diagnostic::error_string(
"expected known value, found unknown value",
)
.with_path(path)
.into()),
BaseValue::Known(v) => Ok(Some(v)),
}
}
} }
impl<T> From<T> for BaseValue<T> { impl<T> From<T> for BaseValue<T> {
@ -165,6 +181,10 @@ pub trait ValueModel: Sized {
fn from_value(v: Value, path: &AttrPath) -> DResult<Self>; fn from_value(v: Value, path: &AttrPath) -> DResult<Self>;
fn to_value(self) -> Value; fn to_value(self) -> Value;
fn from_root_value(v: Value) -> DResult<Self> {
Self::from_value(v, &AttrPath::root())
}
} }
impl ValueModel for StringValue { impl ValueModel for StringValue {