diff --git a/blob_storage.py b/blob_storage.py new file mode 100644 index 0000000..557d4f7 --- /dev/null +++ b/blob_storage.py @@ -0,0 +1,156 @@ +"""S3 and local storage backends for IFCB blob images.""" + +from abc import ABC, abstractmethod +import zipfile +import os +import io +import boto3 +import botocore +from typing import Dict, Any +import traceback +from dataclasses import dataclass + + +@dataclass +class S3Config: + """Configuration for S3 blob storage.""" + bucket_name: str + s3_url: str + prefix: str = "ifcb-blobs-slim-features/" + + +class BlobStorage(ABC): + """Abstract interface for blob storage backends (S3 or local ZIP files).""" + + @abstractmethod + def store_blob(self, sample_id: str, roi_number: int, blob_data: bytes) -> bool: + """Store a single blob.""" + pass + + @abstractmethod + def finalize_sample(self, sample_id: str) -> bool: + """Finalize storage for a sample (e.g., close ZIP file).""" + pass + + @abstractmethod + def cleanup(self): + """Cleanup resources.""" + pass + + +class LocalZipStorage(BlobStorage): + """Local ZIP file storage backend (original behavior).""" + + def __init__(self, output_directory: str): + self.output_directory = output_directory + self.zip_files: Dict[str, zipfile.ZipFile] = {} + self.blob_counts: Dict[str, int] = {} + + def store_blob(self, sample_id: str, roi_number: int, blob_data: bytes) -> bool: + """Store a blob in the ZIP file for this sample.""" + try: + if sample_id not in self.zip_files: + zip_filename = os.path.join(self.output_directory, f"{sample_id}_blobs_v4.zip") + self.zip_files[sample_id] = zipfile.ZipFile(zip_filename, 'w') + self.blob_counts[sample_id] = 0 + + filename = f"{sample_id}_{roi_number:05d}.png" + self.zip_files[sample_id].writestr(filename, blob_data) + self.blob_counts[sample_id] += 1 + return True + + except Exception as e: + print(f"Error storing blob {roi_number} for sample {sample_id}: {e}") + return False + + def finalize_sample(self, sample_id: str) -> bool: + """Close the ZIP file for this sample.""" + if sample_id in self.zip_files: + try: + self.zip_files[sample_id].close() + print(f"Stored {self.blob_counts[sample_id]} blobs for sample {sample_id} in ZIP") + del self.zip_files[sample_id] + del self.blob_counts[sample_id] + return True + except Exception as e: + print(f"Error finalizing ZIP for sample {sample_id}: {e}") + return False + return True + + def cleanup(self): + """Close any remaining ZIP files.""" + for sample_id in list(self.zip_files.keys()): + self.finalize_sample(sample_id) + + +class S3BlobStorage(BlobStorage): + """S3 storage backend using boto3.""" + + def __init__(self, s3_config: S3Config): + self.config = s3_config + self.s3_client = None + self.blob_counts: Dict[str, int] = {} + self._setup_s3_client() + + def _setup_s3_client(self): + """Initialize S3 client.""" + try: + session = boto3.Session() + self.s3_client = session.client( + 's3', + endpoint_url=self.config.s3_url + ) + print(f"Connected to S3 at {self.config.s3_url}") + except Exception as e: + print(f"Failed to setup S3 client: {e}") + raise + + def store_blob(self, sample_id: str, roi_number: int, blob_data: bytes) -> bool: + """Store a blob in S3.""" + try: + if sample_id not in self.blob_counts: + self.blob_counts[sample_id] = 0 + + key = f"{self.config.prefix}{sample_id}/{roi_number:05d}.png" + + self.s3_client.put_object( + Bucket=self.config.bucket_name, + Key=key, + Body=blob_data, + ContentType='image/png' + ) + + self.blob_counts[sample_id] += 1 + return True + + except Exception as e: + print(f"Error storing blob {roi_number} for sample {sample_id} to S3: {e}") + traceback.print_exc() + return False + + def finalize_sample(self, sample_id: str) -> bool: + """Log completion for this sample.""" + if sample_id in self.blob_counts: + print(f"Stored {self.blob_counts[sample_id]} blobs for sample {sample_id} in S3") + del self.blob_counts[sample_id] + return True + + def cleanup(self): + """Close S3 client.""" + if self.s3_client: + try: + self.s3_client.close() + except: + pass + + +def create_blob_storage(storage_mode: str, output_directory: str, s3_config: S3Config = None) -> BlobStorage: + """Factory function to create appropriate blob storage backend.""" + if storage_mode == "local": + return LocalZipStorage(output_directory) + elif storage_mode == "s3": + if s3_config is None: + raise ValueError("S3 configuration required for S3 storage mode") + return S3BlobStorage(s3_config) + else: + raise ValueError(f"Unknown storage mode: {storage_mode}. Use 'local' or 's3'") diff --git a/extract_slim_features.py b/extract_slim_features.py index 93f3f7a..15576ea 100644 --- a/extract_slim_features.py +++ b/extract_slim_features.py @@ -12,8 +12,12 @@ import traceback from ifcb_features.all import compute_features +from blob_storage import S3Config, create_blob_storage +from feature_storage import VastDBFeatureStorage, VastDBConfig FEATURE_COLUMNS = [ + 'sample_id', + 'roi_number', 'Area', 'Biovolume', 'BoundingBox_xwidth', @@ -46,16 +50,20 @@ 'summedConvexPerimeter_over_Perimeter' ] -def extract_and_save_all_features(data_directory, output_directory, bins=None): +def extract_and_save_all_features(data_directory, output_directory, bins=None, blob_storage_mode="local", s3_config=None, feature_storage_mode="local", vastdb_config=None): """ Extracts slim features from IFCB images in the given directory - and saves them to a CSV file. + and saves them to CSV or VastDB. Args: data_directory (str): Path to the directory containing IFCB data. - output_directory (str): Path to the directory where the CSV file will be saved. + output_directory (str): Path to the directory where the CSV file will be saved (if feature_storage_mode=local). bins (list, optional): A list of bin names (e.g., 'D20240423T115846_IFCB127') to process. If None, all bins in the data directory are processed. Defaults to None. + blob_storage_mode (str): Storage mode for blobs - "local" or "s3". Defaults to "local". + s3_config (S3Config, optional): S3 configuration when using S3 blob storage. + feature_storage_mode (str): Storage mode for features - "local" or "vastdb". Defaults to "local". + vastdb_config (VastDBConfig, optional): VastDB configuration when using VastDB feature storage. """ try: data_dir = DataDirectory(data_directory) @@ -87,47 +95,153 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None): for sample in data_dir: samples_to_process.append(sample) - for sample in samples_to_process: - all_features = [] - all_blobs = {} - features_output_filename = os.path.join(output_directory, f"{sample.lid}_features_v4.csv") - blobs_output_filename = os.path.join(output_directory, f"{sample.lid}_blobs_v4.zip") - for number, image in sample.images.items(): - features = { - 'roi_number': number, - } + # Validate blob storage configuration + if blob_storage_mode == "s3" and s3_config is None: + raise ValueError("S3 configuration required for S3 blob storage mode") + if blob_storage_mode not in ["local", "s3"]: + raise ValueError(f"Invalid blob storage mode '{blob_storage_mode}'. Use 'local' or 's3'") + + # Validate feature storage configuration + if feature_storage_mode == "vastdb" and vastdb_config is None: + raise ValueError("VastDB configuration required for VastDB feature storage mode") + if feature_storage_mode not in ["local", "vastdb"]: + raise ValueError(f"Invalid feature storage mode '{feature_storage_mode}'. Use 'local' or 'vastdb'") + + # Create blob storage backend + blob_storage = create_blob_storage( + storage_mode=blob_storage_mode, + output_directory=output_directory, + s3_config=s3_config + ) + + print(f"Blob storage: {blob_storage_mode}") + + # Initialize feature storage + vastdb_storage = None + if feature_storage_mode == "vastdb": + vastdb_storage = VastDBFeatureStorage(vastdb_config) + print(f"Feature storage: vastdb ({vastdb_config.schema_name}.{vastdb_config.table_name})") + else: + print(f"Feature storage: local CSV") + + try: + for sample in samples_to_process: + all_features = [] + features_output_filename = os.path.join(output_directory, f"{sample.lid}_features_v4.csv") + try: - blobs_image, roi_features = compute_features(image) - features.update(roi_features) + with sample: # Open ROI file + for number, image in sample.images.items(): + features = { + 'sample_id': sample.lid, + 'roi_number': number, + } + try: + blobs_image, roi_features = compute_features(image) + features.update(roi_features) - img_buffer = io.BytesIO() - Image.fromarray((blobs_image > 0).astype(np.uint8) * 255).save(img_buffer, format="PNG") - all_blobs[number] = img_buffer.getvalue() - except Exception as e: - print(f"Error processing ROI {number} in sample {sample.pid}: {e}") + # Store blob using the configured storage backend + img_buffer = io.BytesIO() + Image.fromarray((blobs_image > 0).astype(np.uint8) * 255).save(img_buffer, format="PNG") + blob_data = img_buffer.getvalue() + + blob_storage.store_blob(sample.lid, number, blob_data) + + except Exception as e: + print(f"Error processing ROI {number} in sample {sample.pid}: {e}") + + all_features.append(features) - all_features.append(features) + if all_features: + df = pd.DataFrame.from_records(all_features, columns=FEATURE_COLUMNS) - if all_features: - df = pd.DataFrame.from_records(all_features, columns=['roi_number'] + FEATURE_COLUMNS) - df.to_csv(features_output_filename, index=False) - - if all_blobs: - with zipfile.ZipFile(blobs_output_filename, 'w') as zf: - for roi_number, blob_data in all_blobs.items(): - filename = f"{sample.lid}_{roi_number:05d}.png" - zf.writestr(filename, blob_data) + # Save features based on storage mode + if feature_storage_mode == "local": + df.to_csv(features_output_filename, index=False) + elif feature_storage_mode == "vastdb": + vastdb_storage.insert_features(df) + + # Finalize blob storage for this sample + blob_storage.finalize_sample(sample.lid) + + except Exception as e: + print(f"Error processing sample {sample.pid}: {e}") + traceback.print_exc() + continue + + finally: + # Cleanup storage resources + blob_storage.cleanup() + if vastdb_storage: + vastdb_storage.cleanup() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Extract various ROI features and save blobs as 1-bit PNGs.") parser.add_argument("data_directory", help="Path to the directory containing IFCB data.") parser.add_argument("output_directory", help="Path to the directory to save the output CSV file and blobs.") parser.add_argument("--bins", nargs='+', help="List of bin names to process (space-separated). If not provided, all bins are processed.") + + # Blob storage options + parser.add_argument("--blob-storage-mode", choices=["local", "s3"], default="local", + help="Storage mode for blob images (default: local)") + parser.add_argument("--s3-bucket", help="S3 bucket name (required when blob-storage-mode=s3)") + parser.add_argument("--s3-url", help="S3 endpoint URL (required when blob-storage-mode=s3)") + parser.add_argument("--s3-prefix", default="ifcb-blobs-slim-features/", + help="S3 key prefix for blob storage (default: ifcb-blobs-slim-features/)") + + # Feature storage options + parser.add_argument("--feature-storage-mode", choices=["local", "vastdb"], default="local", + help="Storage mode for features (default: local)") + parser.add_argument("--vastdb-bucket", help="VastDB bucket name (required when feature-storage-mode=vastdb)") + parser.add_argument("--vastdb-schema", help="VastDB schema name (required when feature-storage-mode=vastdb)") + parser.add_argument("--vastdb-table", help="VastDB table name (required when feature-storage-mode=vastdb)") + parser.add_argument("--vastdb-url", help="VastDB endpoint URL (defaults to s3-url if not provided)") + parser.add_argument("--vastdb-access-key", help="VastDB access key (uses AWS_ACCESS_KEY_ID env var if not provided)") + parser.add_argument("--vastdb-secret-key", help="VastDB secret key (uses AWS_SECRET_ACCESS_KEY env var if not provided)") args = parser.parse_args() + + # Set up S3 configuration if using S3 blob storage + if args.blob_storage_mode == "s3": + if not args.s3_bucket or not args.s3_url: + parser.error("--s3-bucket and --s3-url are required when using --blob-storage-mode=s3") + s3_config = S3Config( + bucket_name=args.s3_bucket, + s3_url=args.s3_url, + prefix=args.s3_prefix + ) + else: + s3_config = None + + # Set up VastDB configuration if using VastDB feature storage + vastdb_config = None + if args.feature_storage_mode == "vastdb": + if not args.vastdb_bucket or not args.vastdb_schema or not args.vastdb_table: + parser.error("--vastdb-bucket, --vastdb-schema, and --vastdb-table are required when using --feature-storage-mode=vastdb") + + # Use provided endpoint or fall back to S3 URL + vastdb_url = args.vastdb_url or args.s3_url + if not vastdb_url: + parser.error("--vastdb-url or --s3-url must be provided when using --feature-storage-mode=vastdb") + + # Get credentials from args or environment + access_key = args.vastdb_access_key or os.environ.get('AWS_ACCESS_KEY_ID') + secret_key = args.vastdb_secret_key or os.environ.get('AWS_SECRET_ACCESS_KEY') + + if not access_key or not secret_key: + parser.error("VastDB credentials required: provide --vastdb-access-key/--vastdb-secret-key or set AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY environment variables") + + vastdb_config = VastDBConfig( + bucket_name=args.vastdb_bucket, + schema_name=args.vastdb_schema, + table_name=args.vastdb_table, + endpoint_url=vastdb_url, + access_key=access_key, + secret_key=secret_key + ) beginning = time.time() - extract_and_save_all_features(args.data_directory, args.output_directory, args.bins) + extract_and_save_all_features(args.data_directory, args.output_directory, args.bins, args.blob_storage_mode, s3_config, args.feature_storage_mode, vastdb_config) elapsed = time.time() - beginning print(f'Total extract time: {elapsed:.2f} seconds') \ No newline at end of file diff --git a/feature_storage.py b/feature_storage.py new file mode 100644 index 0000000..72c3f15 --- /dev/null +++ b/feature_storage.py @@ -0,0 +1,158 @@ +"""VastDB storage for IFCB features.""" +from dataclasses import dataclass +from typing import Optional +import pandas as pd +import pyarrow as pa +import vastdb +from blob_storage import S3Config + + +@dataclass +class VastDBConfig: + """Configuration for VastDB feature storage.""" + bucket_name: str + schema_name: str + table_name: str + endpoint_url: str + access_key: str + secret_key: str + + +class VastDBFeatureStorage: + """VastDB storage backend for features.""" + + def __init__(self, config: VastDBConfig): + self.config = config + self.session = None + self._setup_session() + + def _setup_session(self): + """Initialize VastDB session.""" + try: + self.session = vastdb.connect( + endpoint=self.config.endpoint_url, + access=self.config.access_key, + secret=self.config.secret_key + ) + print(f"Connected to VastDB at {self.config.endpoint_url}") + except Exception as e: + print(f"Failed to connect to VastDB: {e}") + raise + + def _get_features_schema(self) -> pa.Schema: + """Define PyArrow schema for features table.""" + # Composite key: sample_id (string) + roi_number (int64) + # All feature columns are float64 + return pa.schema([ + ('sample_id', pa.string()), + ('roi_number', pa.int64()), + ('Area', pa.float64()), + ('Biovolume', pa.float64()), + ('BoundingBox_xwidth', pa.float64()), + ('BoundingBox_ywidth', pa.float64()), + ('ConvexArea', pa.float64()), + ('ConvexPerimeter', pa.float64()), + ('Eccentricity', pa.float64()), + ('EquivDiameter', pa.float64()), + ('Extent', pa.float64()), + ('MajorAxisLength', pa.float64()), + ('MinorAxisLength', pa.float64()), + ('Orientation', pa.float64()), + ('Perimeter', pa.float64()), + ('RepresentativeWidth', pa.float64()), + ('Solidity', pa.float64()), + ('SurfaceArea', pa.float64()), + ('maxFeretDiameter', pa.float64()), + ('minFeretDiameter', pa.float64()), + ('numBlobs', pa.float64()), + ('summedArea', pa.float64()), + ('summedBiovolume', pa.float64()), + ('summedConvexArea', pa.float64()), + ('summedConvexPerimeter', pa.float64()), + ('summedMajorAxisLength', pa.float64()), + ('summedMinorAxisLength', pa.float64()), + ('summedPerimeter', pa.float64()), + ('summedSurfaceArea', pa.float64()), + ('Area_over_PerimeterSquared', pa.float64()), + ('Area_over_Perimeter', pa.float64()), + ('summedConvexPerimeter_over_Perimeter', pa.float64()), + ]) + + def _ensure_table_exists(self, tx): + """Ensure schema and table exist, create if they don't.""" + try: + # Get or create bucket + bucket = tx.bucket(self.config.bucket_name) + + # Try to get existing schema, create if doesn't exist + try: + schema = bucket.schema(self.config.schema_name) + print(f"Using existing schema: {self.config.schema_name}") + except Exception: + schema = bucket.create_schema(self.config.schema_name) + print(f"Created new schema: {self.config.schema_name}") + + # Try to get existing table, create if doesn't exist + try: + table = schema.table(self.config.table_name) + print(f"Using existing table: {self.config.table_name}") + except Exception: + columns = self._get_features_schema() + table = schema.create_table(self.config.table_name, columns) + print(f"Created new table: {self.config.table_name}") + + return table + + except Exception as e: + print(f"Error ensuring table exists: {e}") + raise + + def insert_features(self, features_df: pd.DataFrame) -> bool: + """Insert features DataFrame into VastDB table.""" + try: + with self.session.transaction() as tx: + table = self._ensure_table_exists(tx) + + # Convert pandas DataFrame to PyArrow Table + arrow_table = pa.Table.from_pandas(features_df, schema=self._get_features_schema()) + + # Insert data + table.insert(arrow_table) + + print(f"Inserted {len(features_df)} rows into {self.config.table_name}") + return True + + except Exception as e: + print(f"Error inserting features into VastDB: {e}") + import traceback + traceback.print_exc() + return False + + def cleanup(self): + """Close VastDB session.""" + if self.session: + try: + # VastDB session cleanup if needed + pass + except Exception as e: + print(f"Error during cleanup: {e}") + + +def create_vastdb_storage_from_s3_config( + s3_config: S3Config, + bucket_name: str, + schema_name: str, + table_name: str, + access_key: str, + secret_key: str +) -> VastDBFeatureStorage: + """Create VastDB storage using S3 config endpoint.""" + vastdb_config = VastDBConfig( + bucket_name=bucket_name, + schema_name=schema_name, + table_name=table_name, + endpoint_url=s3_config.s3_url, + access_key=access_key, + secret_key=secret_key + ) + return VastDBFeatureStorage(vastdb_config) diff --git a/pyproject.toml b/pyproject.toml index 9fe2903..f328d78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,11 +26,16 @@ classifiers = [ ] dependencies = [ "numpy", + "pandas", + "pillow", "scipy", "scikit-image", "phasepack @ git+https://github.com/WHOIGit/phasepack@v1.6.1", "scikit-learn", - "pyifcb @ git+https://github.com/joefutrelle/pyifcb@v1.2.1" + "pyifcb @ git+https://github.com/joefutrelle/pyifcb@v1.2.1", + "boto3", + "pyarrow>=18.0", + "vastdb" ] [project.urls] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 0d1c87f..0000000 --- a/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -pandas==2.2.3 -scipy==1.13.1 -scikit-image==0.24.0 -git+https://github.com/WHOIGit/phasepack@v1.6.1 -scikit-learn==1.7.1