Handle shutdown correctly

This commit is contained in:
nora 2024-04-12 19:58:59 +02:00
parent 41e578e199
commit f9da7ebe43
7 changed files with 69 additions and 10 deletions

1
Cargo.lock generated
View file

@ -1074,6 +1074,7 @@ dependencies = [
"time", "time",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-util",
"tonic", "tonic",
"tonic-build", "tonic-build",
"tracing", "tracing",

View file

@ -18,6 +18,7 @@ tempfile = "3.10.1"
time = "0.3.35" time = "0.3.35"
tokio = { version = "1.37.0", features = ["full"] } tokio = { version = "1.37.0", features = ["full"] }
tokio-stream = { version = "0.1.15", features = ["net"] } tokio-stream = { version = "0.1.15", features = ["net"] }
tokio-util = "0.7.10"
tonic = { version = "0.11.0", features = ["tls"] } tonic = { version = "0.11.0", features = ["tls"] }
tracing = "0.1.40" tracing = "0.1.40"
tracing-subscriber = "0.3.18" tracing-subscriber = "0.3.18"

View file

@ -1,4 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/tfplugin6.6.proto")?; tonic_build::compile_protos("proto/tfplugin6.6.proto")?;
tonic_build::compile_protos("proto/controller.proto")?;
Ok(()) Ok(())
} }

12
proto/controller.proto Normal file
View file

@ -0,0 +1,12 @@
syntax = "proto3";
package plugin;
option go_package = "./plugin";
message Empty {
}
// The GRPCController is responsible for telling the plugin server to shutdown.
service GRPCController {
rpc Shutdown(Empty) returns (Empty);
}

View file

@ -1,3 +1,5 @@
#![allow(dead_code)]
pub trait DataSource { pub trait DataSource {
fn schema(&self); fn schema(&self);
fn read(&self) -> DResult<()>; fn read(&self) -> DResult<()>;

View file

@ -1,7 +1,7 @@
mod cert; mod cert;
mod framework;
mod server; mod server;
mod values; mod values;
mod framework;
use std::{env, path::PathBuf}; use std::{env, path::PathBuf};
@ -43,15 +43,31 @@ async fn main() -> eyre::Result<()> {
let uds = UnixListener::bind(socket).wrap_err("failed to bind unix listener")?; let uds = UnixListener::bind(socket).wrap_err("failed to bind unix listener")?;
let uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); let uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);
tonic::transport::Server::builder() let token = tokio_util::sync::CancellationToken::new();
let server = tonic::transport::Server::builder()
.tls_config(tls) .tls_config(tls)
.wrap_err("invalid TLS config")? .wrap_err("invalid TLS config")?
.add_service(server::tfplugin6::provider_server::ProviderServer::new( .add_service(server::tfplugin6::provider_server::ProviderServer::new(
server::MyProvider, server::MyProvider {
shutdown: token.clone(),
},
)) ))
.serve_with_incoming(uds_stream) .add_service(
.await server::plugin::grpc_controller_server::GrpcControllerServer::new(
.wrap_err("failed to start server")?; server::MyController {
shutdown: token.clone(),
},
),
)
.serve_with_incoming(uds_stream);
tokio::select! {
_ = token.cancelled() => {}
result = server => {
result.wrap_err("failed to start server")?;
}
}
Ok(()) Ok(())
} }

View file

@ -4,19 +4,27 @@ pub mod tfplugin6 {
tonic::include_proto!("tfplugin6"); tonic::include_proto!("tfplugin6");
} }
pub mod plugin {
tonic::include_proto!("plugin");
}
use std::{ use std::{
collections::{BTreeMap, HashMap}, collections::{BTreeMap, HashMap},
sync::Mutex,
vec, vec,
}; };
use tfplugin6::provider_server::{Provider, ProviderServer}; use tfplugin6::provider_server::{Provider, ProviderServer};
use tokio_util::sync::CancellationToken;
use tonic::{transport::Server, Request, Response, Result, Status}; use tonic::{transport::Server, Request, Response, Result, Status};
use tracing::info; use tracing::info;
use crate::values::Type; use crate::values::Type;
#[derive(Debug, Default)] #[derive(Debug)]
pub struct MyProvider; pub struct MyProvider {
pub shutdown: CancellationToken,
}
fn empty_schema() -> tfplugin6::Schema { fn empty_schema() -> tfplugin6::Schema {
tfplugin6::Schema { tfplugin6::Schema {
@ -43,6 +51,7 @@ impl Provider for MyProvider {
&self, &self,
request: Request<tfplugin6::get_metadata::Request>, request: Request<tfplugin6::get_metadata::Request>,
) -> Result<Response<tfplugin6::get_metadata::Response>, Status> { ) -> Result<Response<tfplugin6::get_metadata::Response>, Status> {
info!("get_metadata");
Err(Status::unimplemented( Err(Status::unimplemented(
"GetMetadata: Not implemeneted".to_owned(), "GetMetadata: Not implemeneted".to_owned(),
)) ))
@ -250,8 +259,25 @@ impl Provider for MyProvider {
&self, &self,
request: Request<tfplugin6::stop_provider::Request>, request: Request<tfplugin6::stop_provider::Request>,
) -> Result<Response<tfplugin6::stop_provider::Response>, Status> { ) -> Result<Response<tfplugin6::stop_provider::Response>, Status> {
tracing::error!("stop_provider"); tracing::info!("stop_provider");
todo!("stop_provider") shutdown(&self.shutdown).await
}
}
pub struct MyController {
pub shutdown: CancellationToken,
}
async fn shutdown(token: &CancellationToken) -> ! {
token.cancel();
std::future::poll_fn::<(), _>(|_| std::task::Poll::Pending).await;
unreachable!("we've should have gone to sleep")
}
#[tonic::async_trait]
impl plugin::grpc_controller_server::GrpcController for MyController {
async fn shutdown(&self, request: Request<plugin::Empty>) -> Result<Response<plugin::Empty>> {
shutdown(&self.shutdown).await
} }
} }