Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 0 additions & 81 deletions flow/test_flow_one.json

This file was deleted.

8 changes: 7 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,15 @@ async fn main() {
}
});

let env = match config.environment {
code0_flow::flow_config::environment::Environment::Development => String::from("DEVELOPMENT"),
code0_flow::flow_config::environment::Environment::Staging => String::from("STAGING"),
code0_flow::flow_config::environment::Environment::Production => String::from("PRODUCTION"),
};

let mut flow_task = tokio::spawn(async move {
let mut flow_client =
SagittariusFlowClient::new(backend_url_flow, kv_for_flow, runtime_token_flow).await;
SagittariusFlowClient::new(backend_url_flow, kv_for_flow, env, runtime_token_flow).await;

flow_client.init_flow_stream().await;
log::warn!("Flow stream task exited");
Expand Down
53 changes: 50 additions & 3 deletions src/sagittarius/flow_service_client_impl.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
use futures::{StreamExt, TryStreamExt};
use prost::Message;
use std::sync::Arc;
use tokio::fs;
use std::{path::Path, sync::Arc};
use tonic::{Extensions, Request, transport::Channel};
use tucana::sagittarius::{
use tucana::{sagittarius::{
FlowLogonRequest, FlowResponse, flow_response::Data, flow_service_client::FlowServiceClient,
};
}, shared::{Flows, ValidationFlow}};

use crate::{authorization::authorization::get_authorization_metadata, flow::get_flow_identifier};

#[derive(Clone)]
pub struct SagittariusFlowClient {
store: Arc<async_nats::jetstream::kv::Store>,
client: FlowServiceClient<Channel>,
env: String,
token: String,
}

impl SagittariusFlowClient {
pub async fn new(
sagittarius_url: String,
store: Arc<async_nats::jetstream::kv::Store>,
env: String,
token: String,
) -> SagittariusFlowClient {
let client = match FlowServiceClient::connect(sagittarius_url).await {
Expand All @@ -35,10 +38,50 @@ impl SagittariusFlowClient {
SagittariusFlowClient {
store,
client,
env,
token,
}
}

fn is_development(&self) -> bool {
self.env == String::from("DEVELOPMENT")
}

async fn export_flows_json_overwrite(&self, flows: Flows) {
if !self.is_development() {
return;
}

log::info!("Will export flows into file because env is set to `DEVELOPMENT`");

let json = match serde_json::to_vec_pretty(&flows) {
Ok(b) => b,
Err(e) => {
log::error!("Failed to serialize flows to JSON: {:?}", e);
return;
}
};

let final_path = Path::new("flowExport.json");
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filename uses camelCase ("flowExport.json") which is inconsistent with the snake_case naming convention used elsewhere in the codebase. The existing flow file is named "test_flow_one.json" (see configuration/mod.rs:63). Consider renaming to "flow_export.json" for consistency.

Copilot uses AI. Check for mistakes.
let tmp_path = Path::new("flowExport.json.tmp");
Comment on lines +65 to +66
Copy link

Copilot AI Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file path is hardcoded and relative to the current working directory, which may vary depending on how the application is started. This is inconsistent with how flow_fallback_path is configurable via environment variables (see configuration/mod.rs:61-64). Consider either making the export path configurable via environment variable or using an absolute path based on a known location.

Suggested change
let final_path = Path::new("flowExport.json");
let tmp_path = Path::new("flowExport.json.tmp");
let export_path = std::env::var("FLOW_EXPORT_PATH")
.unwrap_or_else(|_| "flowExport.json".to_string());
let final_path = Path::new(&export_path);
let tmp_path_string = format!("{}.tmp", export_path);
let tmp_path = Path::new(&tmp_path_string);

Copilot uses AI. Check for mistakes.

if let Err(e) = fs::write(tmp_path, &json).await {
log::error!("Failed to write {}: {}", tmp_path.display(), e);
return;
}

if let Err(e) = fs::rename(tmp_path, final_path).await {
log::warn!("rename failed (will try remove+rename): {}", e);
let _ = fs::remove_file(final_path).await;
if let Err(e2) = fs::rename(tmp_path, final_path).await {
log::error!("Failed to move export into place: {}", e2);
let _ = fs::remove_file(tmp_path).await;
}
}

log::info!("Exported {} flows to {}", flows.flows.len(), final_path.display());
}

async fn handle_response(&mut self, response: FlowResponse) {
let data = match response.data {
Some(data) => {
Expand Down Expand Up @@ -74,6 +117,10 @@ impl SagittariusFlowClient {
//WIll drop all flows that it holds and insert all new ones
Data::Flows(flows) => {
log::info!("Dropping all Flows & inserting the new ones!");

// Writing all flows into an output if its in `DEVELOPMENT`
self.export_flows_json_overwrite(flows.clone()).await;

let mut keys = match self.store.keys().await {
Ok(keys) => keys.boxed(),
Err(err) => {
Expand Down