From 87d61f284ab8ce04067908def0f85a27a268d2ed Mon Sep 17 00:00:00 2001 From: Raphael Date: Thu, 29 Jan 2026 11:04:54 +0100 Subject: [PATCH 1/3] feat: added export on drop & insert to write all flows to a json --- src/main.rs | 8 +++- src/sagittarius/flow_service_client_impl.rs | 53 +++++++++++++++++++-- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index b0d514c..20784dc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -79,9 +79,15 @@ async fn main() { } }); + let env = match config.environment { + code0_flow::flow_config::environment::Environment::Development => String::from("DEVELOPMENT"), + code0_flow::flow_config::environment::Environment::Staging => String::from("STAGING"), + code0_flow::flow_config::environment::Environment::Production => String::from("PRODUCTION"), + }; + let mut flow_task = tokio::spawn(async move { let mut flow_client = - SagittariusFlowClient::new(backend_url_flow, kv_for_flow, runtime_token_flow).await; + SagittariusFlowClient::new(backend_url_flow, kv_for_flow, env, runtime_token_flow).await; flow_client.init_flow_stream().await; log::warn!("Flow stream task exited"); diff --git a/src/sagittarius/flow_service_client_impl.rs b/src/sagittarius/flow_service_client_impl.rs index e1d82ce..010d223 100644 --- a/src/sagittarius/flow_service_client_impl.rs +++ b/src/sagittarius/flow_service_client_impl.rs @@ -1,10 +1,11 @@ use futures::{StreamExt, TryStreamExt}; use prost::Message; -use std::sync::Arc; +use tokio::fs; +use std::{path::Path, sync::Arc}; use tonic::{Extensions, Request, transport::Channel}; -use tucana::sagittarius::{ +use tucana::{sagittarius::{ FlowLogonRequest, FlowResponse, flow_response::Data, flow_service_client::FlowServiceClient, -}; +}, shared::ValidationFlow}; use crate::{authorization::authorization::get_authorization_metadata, flow::get_flow_identifier}; @@ -12,6 +13,7 @@ use crate::{authorization::authorization::get_authorization_metadata, flow::get_ pub struct SagittariusFlowClient { store: Arc, client: FlowServiceClient, + env: String, token: String, } @@ -19,6 +21,7 @@ impl SagittariusFlowClient { pub async fn new( sagittarius_url: String, store: Arc, + env: String, token: String, ) -> SagittariusFlowClient { let client = match FlowServiceClient::connect(sagittarius_url).await { @@ -35,10 +38,50 @@ impl SagittariusFlowClient { SagittariusFlowClient { store, client, + env, token, } } + fn is_development(&self) -> bool { + self.env == String::from("DEVELOPMENT") + } + + async fn export_flows_json_overwrite(&self, flows: &[ValidationFlow]) { + if !self.is_development() { + return; + } + + log::info!("Will export flows into file because env is set to `DEVELOPMENT`"); + + let json = match serde_json::to_vec_pretty(flows) { + Ok(b) => b, + Err(e) => { + log::error!("Failed to serialize flows to JSON: {:?}", e); + return; + } + }; + + let final_path = Path::new("flowExport.json"); + let tmp_path = Path::new("flowExport.json.tmp"); + + if let Err(e) = fs::write(tmp_path, &json).await { + log::error!("Failed to write {}: {}", tmp_path.display(), e); + return; + } + + if let Err(e) = fs::rename(tmp_path, final_path).await { + log::warn!("rename failed (will try remove+rename): {}", e); + let _ = fs::remove_file(final_path).await; + if let Err(e2) = fs::rename(tmp_path, final_path).await { + log::error!("Failed to move export into place: {}", e2); + let _ = fs::remove_file(tmp_path).await; + } + } + + log::info!("Exported {} flows to {}", flows.len(), final_path.display()); + } + async fn handle_response(&mut self, response: FlowResponse) { let data = match response.data { Some(data) => { @@ -74,6 +117,10 @@ impl SagittariusFlowClient { //WIll drop all flows that it holds and insert all new ones Data::Flows(flows) => { log::info!("Dropping all Flows & inserting the new ones!"); + + // Writing all flows into an output if its in `DEVELOPMENT` + self.export_flows_json_overwrite(flows.flows.as_slice()).await; + let mut keys = match self.store.keys().await { Ok(keys) => keys.boxed(), Err(err) => { From 4ade5fe71eeb8e65efc4fd2327c6097ce474ced6 Mon Sep 17 00:00:00 2001 From: Raphael Date: Thu, 29 Jan 2026 11:10:18 +0100 Subject: [PATCH 2/3] fix: export should expect Flows object instead of [ValidationFlow] --- src/sagittarius/flow_service_client_impl.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/sagittarius/flow_service_client_impl.rs b/src/sagittarius/flow_service_client_impl.rs index 010d223..b17b097 100644 --- a/src/sagittarius/flow_service_client_impl.rs +++ b/src/sagittarius/flow_service_client_impl.rs @@ -5,7 +5,7 @@ use std::{path::Path, sync::Arc}; use tonic::{Extensions, Request, transport::Channel}; use tucana::{sagittarius::{ FlowLogonRequest, FlowResponse, flow_response::Data, flow_service_client::FlowServiceClient, -}, shared::ValidationFlow}; +}, shared::{Flows, ValidationFlow}}; use crate::{authorization::authorization::get_authorization_metadata, flow::get_flow_identifier}; @@ -47,14 +47,14 @@ impl SagittariusFlowClient { self.env == String::from("DEVELOPMENT") } - async fn export_flows_json_overwrite(&self, flows: &[ValidationFlow]) { + async fn export_flows_json_overwrite(&self, flows: Flows) { if !self.is_development() { return; } log::info!("Will export flows into file because env is set to `DEVELOPMENT`"); - let json = match serde_json::to_vec_pretty(flows) { + let json = match serde_json::to_vec_pretty(&flows) { Ok(b) => b, Err(e) => { log::error!("Failed to serialize flows to JSON: {:?}", e); @@ -79,7 +79,7 @@ impl SagittariusFlowClient { } } - log::info!("Exported {} flows to {}", flows.len(), final_path.display()); + log::info!("Exported {} flows to {}", flows.flows.len(), final_path.display()); } async fn handle_response(&mut self, response: FlowResponse) { @@ -119,7 +119,7 @@ impl SagittariusFlowClient { log::info!("Dropping all Flows & inserting the new ones!"); // Writing all flows into an output if its in `DEVELOPMENT` - self.export_flows_json_overwrite(flows.flows.as_slice()).await; + self.export_flows_json_overwrite(flows.clone()).await; let mut keys = match self.store.keys().await { Ok(keys) => keys.boxed(), From e1cffa4881fa65888892beecee1042fd103a1554 Mon Sep 17 00:00:00 2001 From: Raphael Date: Thu, 29 Jan 2026 11:10:28 +0100 Subject: [PATCH 3/3] drop: removed old flows --- flow/test_flow_one.json | 81 ----------------------------------------- 1 file changed, 81 deletions(-) delete mode 100644 flow/test_flow_one.json diff --git a/flow/test_flow_one.json b/flow/test_flow_one.json deleted file mode 100644 index 367b882..0000000 --- a/flow/test_flow_one.json +++ /dev/null @@ -1,81 +0,0 @@ -{ - "flows": [ - { - "flow_id": 1, - "project_id": 1, - "type": "REST", - "data_types": [], - "input_type_identifier": "HTTP_REQUEST", - "return_type_identifier": "HTTP_RESPONSE", - "settings": [ - { - "database_id": 1, - "flow_setting_id": "HTTP_METHOD", - "object": { - "fields": { - "method": { - "kind": { - "StringValue": "GET" - } - } - } - } - }, - { - "database_id": 1, - "flow_setting_id": "HTTP_URL", - "object": { - "fields": { - "url": { - "kind": { - "StringValue": "/hello-world" - } - } - } - } - }, - { - "database_id": 1, - "flow_setting_id": "HTTP_HOST", - "object": { - "fields": { - "host": { - "kind": { - "StringValue": "localhost" - } - } - } - } - } - ], - "starting_node": { - "database_id": 1, - "runtime_function_id": "std::control::break", - "parameters": [ - { - "database_id": 1, - "runtime_parameter_id": "value", - "value": { - "value": { - "LiteralValue": { - "kind": { - "StructValue": { - "fields": { - "host": { - "kind": { - "StringValue": "localhost" - } - } - } - } - } - } - } - } - } - ], - "next_node": null - } - } - ] -}