Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions blob_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""S3 and local storage backends for IFCB blob images."""

from abc import ABC, abstractmethod
import zipfile
import os
import io
import boto3
import botocore
from typing import Dict, Any
import traceback
from dataclasses import dataclass


@dataclass
class S3Config:
"""Configuration for S3 blob storage."""
bucket_name: str
s3_url: str
prefix: str = "ifcb-blobs-slim-features/"


class BlobStorage(ABC):
"""Abstract interface for blob storage backends (S3 or local ZIP files)."""

@abstractmethod
def store_blob(self, sample_id: str, roi_number: int, blob_data: bytes) -> bool:
"""Store a single blob."""
pass

@abstractmethod
def finalize_sample(self, sample_id: str) -> bool:
"""Finalize storage for a sample (e.g., close ZIP file)."""
pass

@abstractmethod
def cleanup(self):
"""Cleanup resources."""
pass


class LocalZipStorage(BlobStorage):
"""Local ZIP file storage backend (original behavior)."""

def __init__(self, output_directory: str):
self.output_directory = output_directory
self.zip_files: Dict[str, zipfile.ZipFile] = {}
self.blob_counts: Dict[str, int] = {}

def store_blob(self, sample_id: str, roi_number: int, blob_data: bytes) -> bool:
"""Store a blob in the ZIP file for this sample."""
try:
if sample_id not in self.zip_files:
zip_filename = os.path.join(self.output_directory, f"{sample_id}_blobs_v4.zip")
self.zip_files[sample_id] = zipfile.ZipFile(zip_filename, 'w')
self.blob_counts[sample_id] = 0

filename = f"{sample_id}_{roi_number:05d}.png"
self.zip_files[sample_id].writestr(filename, blob_data)
self.blob_counts[sample_id] += 1
return True

except Exception as e:
print(f"Error storing blob {roi_number} for sample {sample_id}: {e}")
return False

def finalize_sample(self, sample_id: str) -> bool:
"""Close the ZIP file for this sample."""
if sample_id in self.zip_files:
try:
self.zip_files[sample_id].close()
print(f"Stored {self.blob_counts[sample_id]} blobs for sample {sample_id} in ZIP")
del self.zip_files[sample_id]
del self.blob_counts[sample_id]
return True
except Exception as e:
print(f"Error finalizing ZIP for sample {sample_id}: {e}")
return False
return True

def cleanup(self):
"""Close any remaining ZIP files."""
for sample_id in list(self.zip_files.keys()):
self.finalize_sample(sample_id)


class S3BlobStorage(BlobStorage):
"""S3 storage backend using boto3."""

def __init__(self, s3_config: S3Config):
self.config = s3_config
self.s3_client = None
self.blob_counts: Dict[str, int] = {}
self._setup_s3_client()

def _setup_s3_client(self):
"""Initialize S3 client."""
try:
session = boto3.Session()
self.s3_client = session.client(
's3',
endpoint_url=self.config.s3_url
)
print(f"Connected to S3 at {self.config.s3_url}")
except Exception as e:
print(f"Failed to setup S3 client: {e}")
raise

def store_blob(self, sample_id: str, roi_number: int, blob_data: bytes) -> bool:
"""Store a blob in S3."""
try:
if sample_id not in self.blob_counts:
self.blob_counts[sample_id] = 0

key = f"{self.config.prefix}{sample_id}/{roi_number:05d}.png"

self.s3_client.put_object(
Bucket=self.config.bucket_name,
Key=key,
Body=blob_data,
ContentType='image/png'
)

self.blob_counts[sample_id] += 1
return True

except Exception as e:
print(f"Error storing blob {roi_number} for sample {sample_id} to S3: {e}")
traceback.print_exc()
return False

def finalize_sample(self, sample_id: str) -> bool:
"""Log completion for this sample."""
if sample_id in self.blob_counts:
print(f"Stored {self.blob_counts[sample_id]} blobs for sample {sample_id} in S3")
del self.blob_counts[sample_id]
return True

def cleanup(self):
"""Close S3 client."""
if self.s3_client:
try:
self.s3_client.close()
except:
pass


def create_blob_storage(storage_mode: str, output_directory: str, s3_config: S3Config = None) -> BlobStorage:
"""Factory function to create appropriate blob storage backend."""
if storage_mode == "local":
return LocalZipStorage(output_directory)
elif storage_mode == "s3":
if s3_config is None:
raise ValueError("S3 configuration required for S3 storage mode")
return S3BlobStorage(s3_config)
else:
raise ValueError(f"Unknown storage mode: {storage_mode}. Use 'local' or 's3'")
174 changes: 144 additions & 30 deletions extract_slim_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
import traceback

from ifcb_features.all import compute_features
from blob_storage import S3Config, create_blob_storage
from feature_storage import VastDBFeatureStorage, VastDBConfig

FEATURE_COLUMNS = [
'sample_id',
'roi_number',
'Area',
'Biovolume',
'BoundingBox_xwidth',
Expand Down Expand Up @@ -46,16 +50,20 @@
'summedConvexPerimeter_over_Perimeter'
]

def extract_and_save_all_features(data_directory, output_directory, bins=None):
def extract_and_save_all_features(data_directory, output_directory, bins=None, blob_storage_mode="local", s3_config=None, feature_storage_mode="local", vastdb_config=None):
"""
Extracts slim features from IFCB images in the given directory
and saves them to a CSV file.
and saves them to CSV or VastDB.

Args:
data_directory (str): Path to the directory containing IFCB data.
output_directory (str): Path to the directory where the CSV file will be saved.
output_directory (str): Path to the directory where the CSV file will be saved (if feature_storage_mode=local).
bins (list, optional): A list of bin names (e.g., 'D20240423T115846_IFCB127') to process.
If None, all bins in the data directory are processed. Defaults to None.
blob_storage_mode (str): Storage mode for blobs - "local" or "s3". Defaults to "local".
s3_config (S3Config, optional): S3 configuration when using S3 blob storage.
feature_storage_mode (str): Storage mode for features - "local" or "vastdb". Defaults to "local".
vastdb_config (VastDBConfig, optional): VastDB configuration when using VastDB feature storage.
"""
try:
data_dir = DataDirectory(data_directory)
Expand Down Expand Up @@ -87,47 +95,153 @@ def extract_and_save_all_features(data_directory, output_directory, bins=None):
for sample in data_dir:
samples_to_process.append(sample)

for sample in samples_to_process:
all_features = []
all_blobs = {}
features_output_filename = os.path.join(output_directory, f"{sample.lid}_features_v4.csv")
blobs_output_filename = os.path.join(output_directory, f"{sample.lid}_blobs_v4.zip")
for number, image in sample.images.items():
features = {
'roi_number': number,
}
# Validate blob storage configuration
if blob_storage_mode == "s3" and s3_config is None:
raise ValueError("S3 configuration required for S3 blob storage mode")
if blob_storage_mode not in ["local", "s3"]:
raise ValueError(f"Invalid blob storage mode '{blob_storage_mode}'. Use 'local' or 's3'")

# Validate feature storage configuration
if feature_storage_mode == "vastdb" and vastdb_config is None:
raise ValueError("VastDB configuration required for VastDB feature storage mode")
if feature_storage_mode not in ["local", "vastdb"]:
raise ValueError(f"Invalid feature storage mode '{feature_storage_mode}'. Use 'local' or 'vastdb'")

# Create blob storage backend
blob_storage = create_blob_storage(
storage_mode=blob_storage_mode,
output_directory=output_directory,
s3_config=s3_config
)

print(f"Blob storage: {blob_storage_mode}")

# Initialize feature storage
vastdb_storage = None
if feature_storage_mode == "vastdb":
vastdb_storage = VastDBFeatureStorage(vastdb_config)
print(f"Feature storage: vastdb ({vastdb_config.schema_name}.{vastdb_config.table_name})")
else:
print(f"Feature storage: local CSV")

try:
for sample in samples_to_process:
all_features = []
features_output_filename = os.path.join(output_directory, f"{sample.lid}_features_v4.csv")

try:
blobs_image, roi_features = compute_features(image)
features.update(roi_features)
with sample: # Open ROI file
for number, image in sample.images.items():
features = {
'sample_id': sample.lid,
'roi_number': number,
}
try:
blobs_image, roi_features = compute_features(image)
features.update(roi_features)

img_buffer = io.BytesIO()
Image.fromarray((blobs_image > 0).astype(np.uint8) * 255).save(img_buffer, format="PNG")
all_blobs[number] = img_buffer.getvalue()
except Exception as e:
print(f"Error processing ROI {number} in sample {sample.pid}: {e}")
# Store blob using the configured storage backend
img_buffer = io.BytesIO()
Image.fromarray((blobs_image > 0).astype(np.uint8) * 255).save(img_buffer, format="PNG")
blob_data = img_buffer.getvalue()

blob_storage.store_blob(sample.lid, number, blob_data)

except Exception as e:
print(f"Error processing ROI {number} in sample {sample.pid}: {e}")

all_features.append(features)

all_features.append(features)
if all_features:
df = pd.DataFrame.from_records(all_features, columns=FEATURE_COLUMNS)

if all_features:
df = pd.DataFrame.from_records(all_features, columns=['roi_number'] + FEATURE_COLUMNS)
df.to_csv(features_output_filename, index=False)

if all_blobs:
with zipfile.ZipFile(blobs_output_filename, 'w') as zf:
for roi_number, blob_data in all_blobs.items():
filename = f"{sample.lid}_{roi_number:05d}.png"
zf.writestr(filename, blob_data)
# Save features based on storage mode
if feature_storage_mode == "local":
df.to_csv(features_output_filename, index=False)
elif feature_storage_mode == "vastdb":
vastdb_storage.insert_features(df)

# Finalize blob storage for this sample
blob_storage.finalize_sample(sample.lid)

except Exception as e:
print(f"Error processing sample {sample.pid}: {e}")
traceback.print_exc()
continue

finally:
# Cleanup storage resources
blob_storage.cleanup()
if vastdb_storage:
vastdb_storage.cleanup()

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Extract various ROI features and save blobs as 1-bit PNGs.")
parser.add_argument("data_directory", help="Path to the directory containing IFCB data.")
parser.add_argument("output_directory", help="Path to the directory to save the output CSV file and blobs.")
parser.add_argument("--bins", nargs='+', help="List of bin names to process (space-separated). If not provided, all bins are processed.")

# Blob storage options
parser.add_argument("--blob-storage-mode", choices=["local", "s3"], default="local",
help="Storage mode for blob images (default: local)")
parser.add_argument("--s3-bucket", help="S3 bucket name (required when blob-storage-mode=s3)")
parser.add_argument("--s3-url", help="S3 endpoint URL (required when blob-storage-mode=s3)")
parser.add_argument("--s3-prefix", default="ifcb-blobs-slim-features/",
help="S3 key prefix for blob storage (default: ifcb-blobs-slim-features/)")

# Feature storage options
parser.add_argument("--feature-storage-mode", choices=["local", "vastdb"], default="local",
help="Storage mode for features (default: local)")
parser.add_argument("--vastdb-bucket", help="VastDB bucket name (required when feature-storage-mode=vastdb)")
parser.add_argument("--vastdb-schema", help="VastDB schema name (required when feature-storage-mode=vastdb)")
parser.add_argument("--vastdb-table", help="VastDB table name (required when feature-storage-mode=vastdb)")
parser.add_argument("--vastdb-url", help="VastDB endpoint URL (defaults to s3-url if not provided)")
parser.add_argument("--vastdb-access-key", help="VastDB access key (uses AWS_ACCESS_KEY_ID env var if not provided)")
parser.add_argument("--vastdb-secret-key", help="VastDB secret key (uses AWS_SECRET_ACCESS_KEY env var if not provided)")

args = parser.parse_args()

# Set up S3 configuration if using S3 blob storage
if args.blob_storage_mode == "s3":
if not args.s3_bucket or not args.s3_url:
parser.error("--s3-bucket and --s3-url are required when using --blob-storage-mode=s3")
s3_config = S3Config(
bucket_name=args.s3_bucket,
s3_url=args.s3_url,
prefix=args.s3_prefix
)
else:
s3_config = None

# Set up VastDB configuration if using VastDB feature storage
vastdb_config = None
if args.feature_storage_mode == "vastdb":
if not args.vastdb_bucket or not args.vastdb_schema or not args.vastdb_table:
parser.error("--vastdb-bucket, --vastdb-schema, and --vastdb-table are required when using --feature-storage-mode=vastdb")

# Use provided endpoint or fall back to S3 URL
vastdb_url = args.vastdb_url or args.s3_url
if not vastdb_url:
parser.error("--vastdb-url or --s3-url must be provided when using --feature-storage-mode=vastdb")

# Get credentials from args or environment
access_key = args.vastdb_access_key or os.environ.get('AWS_ACCESS_KEY_ID')
secret_key = args.vastdb_secret_key or os.environ.get('AWS_SECRET_ACCESS_KEY')

if not access_key or not secret_key:
parser.error("VastDB credentials required: provide --vastdb-access-key/--vastdb-secret-key or set AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY environment variables")

vastdb_config = VastDBConfig(
bucket_name=args.vastdb_bucket,
schema_name=args.vastdb_schema,
table_name=args.vastdb_table,
endpoint_url=vastdb_url,
access_key=access_key,
secret_key=secret_key
)

beginning = time.time()
extract_and_save_all_features(args.data_directory, args.output_directory, args.bins)
extract_and_save_all_features(args.data_directory, args.output_directory, args.bins, args.blob_storage_mode, s3_config, args.feature_storage_mode, vastdb_config)
elapsed = time.time() - beginning

print(f'Total extract time: {elapsed:.2f} seconds')
Loading