diff --git a/alert_system/admin.py b/alert_system/admin.py index 281387225..8e0608ea3 100644 --- a/alert_system/admin.py +++ b/alert_system/admin.py @@ -15,7 +15,7 @@ class EventAdmin(admin.ModelAdmin): "stac_id", "created_at", "collection", - "correlation_id", + "guid", ) list_filter = ("connector", "collection") readonly_fields = ("connector",) @@ -31,7 +31,7 @@ class LoadItemAdmin(admin.ModelAdmin): "id", "event_title", "created_at", - "correlation_id", + "guid", "item_eligible", "is_past_event", ) @@ -56,11 +56,11 @@ class LoadItemAdmin(admin.ModelAdmin): class AlertEmailThreadAdmin(admin.ModelAdmin): list_display = ( "user", - "correlation_id", + "parent_guid", "root_email_message_id", ) search_fields = ( - "correlation_id", + "parent_guid", "root_email_message_id", "user__username", ) diff --git a/alert_system/email_processing.py b/alert_system/email_processing.py new file mode 100644 index 000000000..3ecff07b6 --- /dev/null +++ b/alert_system/email_processing.py @@ -0,0 +1,155 @@ +import logging +import uuid +from typing import Optional + +from django.contrib.auth.models import User +from django.db.models import Count +from django.template.loader import render_to_string +from django.utils import timezone + +from alert_system.models import AlertEmailLog, AlertEmailThread, LoadItem +from alert_system.utils import get_alert_email_context, get_alert_subscriptions +from notifications.models import AlertSubscription +from notifications.notification import send_notification + +logger = logging.getLogger(__name__) + + +def send_alert_email_notification( + load_item: LoadItem, + user: User, + subscription: AlertSubscription, + thread: Optional[AlertEmailThread], + is_reply: bool = False, +) -> None: + """Helper function to send email and create log entry""" + message_id: str = str(uuid.uuid4()) + + email_log = AlertEmailLog.objects.create( + user=user, + subscription=subscription, + item=load_item, + status=AlertEmailLog.Status.PROCESSING, + message_id=message_id, + thread=thread, + ) + + try: + if is_reply: + subject = f"Re: Hazard Alert: {load_item.event_title}" + template = "email/alert_system/alert_notification_reply.html" + email_type = "Alert Email Notification Reply" + in_reply_to = thread.root_email_message_id + else: + subject = f"New Hazard Alert: {load_item.event_title}" + template = "email/alert_system/alert_notification.html" + email_type = "Alert Email Notification" + in_reply_to = None + + email_context = get_alert_email_context(load_item, user) + email_body = render_to_string(template, email_context) + + send_notification( + subject=subject, + recipients=user.email, + message_id=message_id, + in_reply_to=in_reply_to, + html=email_body, + mailtype=email_type, + ) + + email_log.status = AlertEmailLog.Status.SENT + email_log.email_sent_at = timezone.now() + email_log.save(update_fields=["status", "email_sent_at"]) + + # Create thread for initial emails + if not is_reply: + thread = AlertEmailThread.objects.create( + user=user, + parent_guid=load_item.parent_guid, + root_email_message_id=message_id, + root_message_sent_at=timezone.now(), + ) + email_log.thread = thread + email_log.save(update_fields=["thread"]) + logger.info( + f"Alert Email thread created for user [{user.get_full_name()}] " f"with parent_guid [{load_item.parent_guid}]" + ) + + logger.info(f"Alert email sent to [{user.get_full_name()}] for LoadItem ID [{load_item.id}]") + + except Exception: + email_log.status = AlertEmailLog.Status.FAILED + email_log.save(update_fields=["status"]) + logger.warning(f"Alert email failed for [{user.get_full_name()}] LoadItem ID [{load_item.id}]", exc_info=True) + + +def process_email_alert(load_item_id: int) -> None: + load_item = LoadItem.objects.select_related("connector", "connector__dtype").filter(id=load_item_id).first() + + if not load_item: + logger.warning(f"LoadItem with ID [{load_item_id}] not found") + return + + subscriptions = list(get_alert_subscriptions(load_item)) + if not subscriptions: + logger.info(f"No alert subscriptions matched for LoadItem ID [{load_item_id}]") + return + + today = timezone.now().date() + user_ids = [sub.user_id for sub in subscriptions] + subscription_ids = [sub.id for sub in subscriptions] + + # Daily email counts per user + daily_counts = ( + AlertEmailLog.objects.filter( + user_id__in=user_ids, + subscription_id__in=subscription_ids, + status=AlertEmailLog.Status.SENT, + email_sent_at__date=today, + ) + .values("user_id", "subscription_id") + .annotate(sent_count=Count("id")) + ) + daily_count_map = {(item["user_id"], item["subscription_id"]): item["sent_count"] for item in daily_counts} + + # Emails already sent for this item (per user) + already_sent = set( + AlertEmailLog.objects.filter( + user_id__in=user_ids, + subscription_id__in=subscription_ids, + item_id=load_item_id, + status=AlertEmailLog.Status.SENT, + ).values_list("user_id", "subscription_id") + ) + + # Existing threads for this correlation_id + existing_threads = { + thread.user_id: thread + for thread in AlertEmailThread.objects.filter( + parent_guid=load_item.parent_guid, + user_id__in=user_ids, + ) + } + + for subscription in subscriptions: + user = subscription.user + user_id: int = user.id + subscription_id: int = subscription.id + + # Reply if this specific user has an existing thread + thread = existing_threads.get(user_id) + is_reply: bool = thread is not None + + # Skip if daily alert limit reached + sent_today: int = daily_count_map.get((user_id, subscription_id), 0) + if subscription.alert_per_day and sent_today >= subscription.alert_per_day: + logger.info(f"Daily alert limit reached for user [{user.get_full_name()}]") + continue + + # Skip duplicate emails for same item + if (user_id, subscription_id) in already_sent: + logger.info(f"Duplicate alert skipped for user [{user.get_full_name()}] " f"with LoadItem ID [{subscription_id}]") + continue + + send_alert_email_notification(load_item=load_item, user=user, subscription=subscription, thread=thread, is_reply=is_reply) diff --git a/alert_system/etl/base/config.py b/alert_system/etl/base/config.py index 124a3a986..8f3d90bef 100644 --- a/alert_system/etl/base/config.py +++ b/alert_system/etl/base/config.py @@ -11,3 +11,4 @@ class ExtractionConfig(TypedDict): filter_impact: Dict | None people_exposed_threshold: int + forecasted_data: bool diff --git a/alert_system/etl/base/extraction.py b/alert_system/etl/base/extraction.py index 3c99cea57..40062df58 100644 --- a/alert_system/etl/base/extraction.py +++ b/alert_system/etl/base/extraction.py @@ -40,6 +40,7 @@ class BaseExtractionClass(ABC): filter_event: Optional[Dict] = None filter_hazard: Optional[Dict] = None filter_impact: Optional[Dict] = None + forecasted_data: bool | None = False config: ExtractionConfig @@ -86,13 +87,16 @@ def fetch_stac_data(url: str, filters: Optional[Dict] = None, timeout: int | Non current_payload = None # Only use params on first request def _get_correlation_id(self, feature: Dict) -> str: - """Extract correlation ID from feature properties.""" return feature.get("properties", {}).get("monty:corr_id") + def _get_guid(self, feature: Dict) -> str: + return feature.get("properties", {}).get("monty:guid") + def _build_base_defaults(self, feature: Dict, run_id: str, collection_type: ExtractionItem.CollectionType) -> Dict: """Build common default fields for all STAC items.""" return { - "correlation_id": self._get_correlation_id(feature), + "guid": self._get_guid(feature=feature), + "correlation_id": self._get_correlation_id(feature=feature), "resp_data": feature, "connector": self.connector, "extraction_run_id": run_id, @@ -138,7 +142,8 @@ def _extract_impact_items(self, stac_obj: ExtractionItem, run_id: str) -> List[E self.base_url, build_stac_search( collections=self.impact_collection_type, - correlation_id=stac_obj.correlation_id, + guid=stac_obj.guid, + forecasted_data=self.forecasted_data, ), ) except Exception as e: @@ -168,7 +173,7 @@ def _extract_hazard_items(self, stac_obj: ExtractionItem, run_id: str) -> Extrac self.base_url, build_stac_search( collections=self.hazard_collection_type, - correlation_id=stac_obj.correlation_id, + guid=stac_obj.guid, ), ) except Exception as e: @@ -189,7 +194,7 @@ def _extract_hazard_items(self, stac_obj: ExtractionItem, run_id: str) -> Extrac hazard_obj = self._save_stac_item(hazard_id, defaults) return hazard_obj - def process_event_items(self, extraction_run_id: str, correlation_id: str | None = None, is_past_event: bool = False) -> None: + def process_event_items(self, extraction_run_id: str, guid: str | None = None, is_past_event: bool = False) -> None: """Process all event items from the connector source.""" filters = [] if self.filter_event: @@ -205,7 +210,7 @@ def process_event_items(self, extraction_run_id: str, correlation_id: str | None build_stac_search( collections=self.event_collection_type, additional_filters=filters, - correlation_id=correlation_id, + guid=guid, datetime_range=None if is_past_event else self.get_datetime_filter(), ), ) @@ -246,10 +251,10 @@ def process_event_items(self, extraction_run_id: str, correlation_id: str | None logger.warning(f"Failed to process event {event_id}: {e}", exc_info=True) raise - def run(self, extraction_run_id: str, correlation_id: str | None = None, is_past_event: bool = False) -> None: + def run(self, extraction_run_id: str, guid: str | None = None, is_past_event: bool = False) -> None: """Main entry point for running the connector.""" try: - self.process_event_items(extraction_run_id, correlation_id, is_past_event) + self.process_event_items(extraction_run_id, guid, is_past_event) except Exception as e: logger.warning(f"Connector run failed: {e}", exc_info=True) raise @@ -280,24 +285,19 @@ def _country_filter(self, country_codes) -> list[str]: filters.append(f"({country_cql})") return filters - def _hazard_filter(self, unit: str | None, value: int | None) -> Optional[str]: - if not unit or value is None: - return None - return f"monty:hazard_detail.severity_unit = '{unit}' AND " f"monty:hazard_detail.severity_value >= {value}" - - def _collect_corr_ids(self, features, exclude: str) -> set[str]: - corr_ids = set() + def _collect_guids(self, features, exclude: str) -> set[str]: + guids = set() for feature in features or []: - corr_id = self.extractor._get_correlation_id(feature) - if corr_id and corr_id != exclude: - corr_ids.add(corr_id) - return corr_ids + guid = self.extractor._get_guid(feature) + if guid and guid != exclude: + guids.add(guid) + return guids - def find_related_corr_ids(self, load_obj: LoadItem) -> set[str]: + def find_related_guids(self, load_obj: LoadItem) -> set[str]: start = timezone.now() - timedelta(weeks=self.extractor.connector.lookback_weeks) end = timezone.now() - corr_ids = set() + guids = set() if self.extractor.impact_collection_type: impact_filter = self._impact_filter(load_obj.impact_metadata) @@ -316,50 +316,34 @@ def find_related_corr_ids(self, load_obj: LoadItem) -> set[str]: collections=self.extractor.impact_collection_type, additional_filters=additional_filters, datetime_range=f"{start.isoformat()}/{end.isoformat()}", + forecasted_data=self.extractor.forecasted_data, ), ) - corr_ids |= self._collect_corr_ids(features, load_obj.correlation_id) - - # NOTE: Returns too many correlation_ids. - # if self.extractor.hazard_collection_type: - # hazard_filter = self._hazard_filter( - # load_obj.severity_unit, - # load_obj.severity_value, - # ) - # features = self.extractor.fetch_stac_data( - # self.base_url, - # build_stac_search( - # collections=self.extractor.hazard_collection_type, - # additional_filters=[hazard_filter], - # datetime_range=f"{start.isoformat()}/{end.isoformat()}", - # ), - # ) - # corr_ids |= self._collect_corr_ids(features, load_obj.correlation_id) - - return corr_ids + guids |= self._collect_guids(features, load_obj.guid) + return guids def extract_past_events(self, load_obj: LoadItem) -> None: - corr_ids = self.find_related_corr_ids(load_obj) + guids = self.find_related_guids(load_obj) - if not corr_ids: + if not guids: return - existing_items = LoadItem.objects.filter(correlation_id__in=corr_ids) - existing_map = {i.correlation_id: i for i in existing_items} + existing_items = LoadItem.objects.filter(guid__in=guids) + existing_map = {i.guid: i for i in existing_items} related_ids = [] - for corr_id in corr_ids: - item = existing_map.get(corr_id) + for guid in guids: + item = existing_map.get(guid) if not item: self.extractor.run( extraction_run_id=load_obj.extraction_run_id, - correlation_id=corr_id, + guid=guid, is_past_event=True, ) - item = LoadItem.objects.filter(correlation_id=corr_id).first() + item = LoadItem.objects.filter(guid=guid).first() if item: related_ids.append(item.id) diff --git a/alert_system/etl/base/loader.py b/alert_system/etl/base/loader.py index 8230c2b3e..279f3420e 100644 --- a/alert_system/etl/base/loader.py +++ b/alert_system/etl/base/loader.py @@ -14,6 +14,16 @@ class BaseLoaderClass(ABC): def filter_eligible_items(self, load_obj): raise NotImplementedError() + def extract_parent_guid(self, guid: str) -> str: + parts = guid.split("-") + + BASE_PART_COUNT = 8 + + if len(parts) > BASE_PART_COUNT: + return "-".join(parts[:BASE_PART_COUNT]) + + return guid + def load(self, transformed_data: Dict, connector: Connector, run_id: str, is_past_event: bool = False) -> LoadItem: """ Save aggregated event. @@ -23,15 +33,18 @@ def load(self, transformed_data: Dict, connector: Connector, run_id: str, is_pas connector: The connector this data came from Returns: - Created DisasterEvent object + Created LoadItem object """ - correlation_id = transformed_data["correlation_id"] + guid = transformed_data["guid"] + parent_guid = self.extract_parent_guid(guid) is_item_eligible = self.filter_eligible_items(transformed_data) load_obj, created = LoadItem.objects.update_or_create( - correlation_id=correlation_id, + guid=guid, defaults={ "connector": connector, + "parent_guid": parent_guid, + "correlation_id": transformed_data.get("correlation_id"), "event_title": transformed_data.get("title"), "event_description": transformed_data.get("description"), "country_codes": transformed_data.get("country"), @@ -50,6 +63,6 @@ def load(self, transformed_data: Dict, connector: Connector, run_id: str, is_pas ) action = "Created" if created else "Updated" - logger.info(f"{action} Event for {correlation_id=}") + logger.info(f"{action} Event for {guid=}") return load_obj diff --git a/alert_system/etl/base/transform.py b/alert_system/etl/base/transform.py index 524eedc71..c5c457b9f 100644 --- a/alert_system/etl/base/transform.py +++ b/alert_system/etl/base/transform.py @@ -34,6 +34,7 @@ def __init__( self.hazard_obj = hazard_obj self.impact_obj = impact_obj self.correlation_id = event_obj.correlation_id + self.guid = event_obj.guid @abstractmethod def process_hazard(self, hazard_item: ExtractionItem | None) -> HazardType: @@ -62,6 +63,7 @@ def transform_stac_item(self): impact_result = self.process_impact(self.impact_obj) return { + "guid": self.guid, "correlation_id": self.correlation_id, **event_result, **hazard_result, diff --git a/alert_system/etl/gdacs_flood/config.py b/alert_system/etl/gdacs_flood/config.py index d55b64c89..4f19b0c90 100644 --- a/alert_system/etl/gdacs_flood/config.py +++ b/alert_system/etl/gdacs_flood/config.py @@ -9,4 +9,5 @@ "filter_hazard": None, "filter_impact": None, "people_exposed_threshold": 500, + "forecasted_data": False, } diff --git a/alert_system/etl/gdacs_flood/loader.py b/alert_system/etl/gdacs_flood/loader.py index 341ae7a7f..db6b2fd32 100644 --- a/alert_system/etl/gdacs_flood/loader.py +++ b/alert_system/etl/gdacs_flood/loader.py @@ -5,7 +5,6 @@ class GdacsLoader(BaseLoaderClass): - # NOTE: Add additional changes to the filter here. This is example only. def filter_eligible_items(self, load_obj): people_exposed = load_obj.get("people_exposed") if people_exposed is None: diff --git a/alert_system/etl/usgs_earthquake/config.py b/alert_system/etl/usgs_earthquake/config.py index 5edac2358..6995cd818 100644 --- a/alert_system/etl/usgs_earthquake/config.py +++ b/alert_system/etl/usgs_earthquake/config.py @@ -8,4 +8,5 @@ "filter_hazard": None, "filter_impact": None, "people_exposed_threshold": 500, + "forecasted_data": True, } diff --git a/alert_system/factories.py b/alert_system/factories.py index ca9b956ba..0bddd70df 100644 --- a/alert_system/factories.py +++ b/alert_system/factories.py @@ -1,9 +1,13 @@ +from uuid import uuid4 + import factory from alert_system.models import AlertEmailLog, AlertEmailThread, Connector, LoadItem class LoadItemFactory(factory.django.DjangoModelFactory): + guid = factory.LazyFunction(lambda: str(uuid4())) + class Meta: model = LoadItem diff --git a/alert_system/helpers.py b/alert_system/helpers.py index f9464dd1a..1d136b3b4 100644 --- a/alert_system/helpers.py +++ b/alert_system/helpers.py @@ -25,24 +25,28 @@ def build_search_params( return params -def build_correlation_filter(correlation_id: str | None) -> str | None: - if correlation_id: - return f"monty:corr_id = '{correlation_id}'" - return None +def build_guid_filter(guid: str) -> str: + return f"monty:guid = '{guid}'" + + +def build_forecasted_filter(forecasted: bool): + return f"forecasted = {forecasted}" def build_stac_search( collections: str, - correlation_id: str | None = None, + guid: str | None = None, additional_filters: list[str] | None = None, datetime_range: str | None = None, extra_params: dict | None = None, + forecasted_data: bool | None = False, ) -> dict: filters = additional_filters.copy() if additional_filters else [] - corr_filter = build_correlation_filter(correlation_id) - if corr_filter: - filters.append(corr_filter) + if forecasted_data: + filters.append(build_forecasted_filter(forecasted_data)) + if guid: + filters.append(build_guid_filter(guid)) return build_search_params( collections=collections, diff --git a/alert_system/management/commands/alert_notification.py b/alert_system/management/commands/alert_notification.py index 0b3c4e675..3ea48cdb2 100644 --- a/alert_system/management/commands/alert_notification.py +++ b/alert_system/management/commands/alert_notification.py @@ -1,10 +1,15 @@ from django.core.management.base import BaseCommand from sentry_sdk import monitor +from alert_system.email_processing import process_email_alert from alert_system.models import LoadItem -from alert_system.tasks import process_email_alert from main.sentry import SentryMonitor +# NOTE: Disabled parallel processing to avoid inconsistent state and keep +# execution deterministic. Email logic is intentionally moved out of tasks.py for now. +# If reintroduced later, a Celery chain may be used to ensure proper ordering +# and retry management. + class Command(BaseCommand): help = "Send alert email notifications for eligible load items" @@ -18,9 +23,8 @@ def handle(self, *args, **options): self.stdout.write(self.style.NOTICE("No eligible items found")) return - self.stdout.write(self.style.NOTICE(f"Queueing {items.count()} items for alert email notification.")) + self.stdout.write(self.style.NOTICE(f"Processing {items.count()} items for alert email notification.")) for item in items.iterator(): - process_email_alert.delay(load_item_id=item.id) - - self.stdout.write(self.style.SUCCESS("All alert notification email queued successfully")) + process_email_alert(load_item_id=item.id) + self.stdout.write(self.style.SUCCESS("All alert notification emails processed successfully")) diff --git a/alert_system/migrations/0001_initial.py b/alert_system/migrations/0001_initial.py index 3ceef89f3..0fe119009 100644 --- a/alert_system/migrations/0001_initial.py +++ b/alert_system/migrations/0001_initial.py @@ -1,5 +1,6 @@ -# Generated by Django 4.2.26 on 2026-01-09 06:03 +# Generated by Django 4.2.26 on 2026-02-03 11:28 +from django.conf import settings import django.contrib.postgres.fields from django.db import migrations, models import django.db.models.deletion @@ -11,6 +12,8 @@ class Migration(migrations.Migration): dependencies = [ ('api', '0226_nsdinitiativescategory_and_more'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('notifications', '0016_alertsubscription'), ] operations = [ @@ -29,7 +32,7 @@ class Migration(migrations.Migration): options={ 'verbose_name': 'Connector', 'verbose_name_plural': 'Connectors', - 'ordering': ['type'], + 'ordering': ['id', 'type'], }, ), migrations.CreateModel( @@ -39,6 +42,8 @@ class Migration(migrations.Migration): ('extraction_run_id', models.UUIDField(blank=True, db_index=True, help_text='UUID field for tracking the extraction run through ETL pipeline', null=True)), ('created_at', models.DateTimeField(auto_now_add=True, help_text='Timestamp when the record was created', verbose_name='Created At')), ('correlation_id', models.CharField(help_text='Correlation identifier linking all models', max_length=255, verbose_name='Correlation ID')), + ('guid', models.CharField(help_text='Globally unique ID for events', verbose_name='GUID')), + ('parent_guid', models.CharField(help_text='GUID without the episode number.', verbose_name='Parent GUID')), ('event_title', models.CharField(help_text='Title of the event', max_length=255, verbose_name='Event Title')), ('event_description', models.TextField(help_text='Description of the event', verbose_name='Event Description')), ('start_datetime', models.DateTimeField(help_text='Start datetime of the event')), @@ -68,6 +73,7 @@ class Migration(migrations.Migration): ('extraction_run_id', models.UUIDField(blank=True, db_index=True, help_text='UUID field for tracking the extraction run through ETL pipeline', null=True)), ('created_at', models.DateTimeField(auto_now_add=True, help_text='Timestamp when the record was created', verbose_name='Created At')), ('correlation_id', models.CharField(help_text='Correlation identifier linking all models', max_length=255, verbose_name='Correlation ID')), + ('guid', models.CharField(help_text='Globally unique ID for events', verbose_name='GUID')), ('collection', models.IntegerField(choices=[(100, 'event'), (200, 'Hazard'), (300, 'Impacts')], help_text='Collection type of the item', verbose_name='Collection')), ('stac_id', models.CharField(db_index=True, help_text='Unique identifier for the event item', max_length=255, unique=True, verbose_name='Event ID')), ('resp_data', models.JSONField(blank=True, help_text='Raw JSON response from the STAC API', null=True, verbose_name='Response Data')), @@ -78,4 +84,58 @@ class Migration(migrations.Migration): 'abstract': False, }, ), + migrations.CreateModel( + name='AlertEmailThread', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('parent_guid', models.CharField(help_text='Identifier linking related LoadItems into the same email thread.')), + ('root_email_message_id', models.CharField(help_text='Message-ID of the first email in this thread.', max_length=255, unique=True)), + ('root_message_sent_at', models.DateTimeField(help_text='Timestamp when the root email was sent.')), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='alert_email_threads', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'verbose_name': 'Email Thread', + 'verbose_name_plural': 'Email Threads', + 'ordering': ['-id'], + }, + ), + migrations.CreateModel( + name='AlertEmailLog', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('message_id', models.CharField(help_text='Unique Message-ID of email for tracking and threading.', max_length=255, unique=True, verbose_name='Message ID')), + ('status', models.IntegerField(choices=[(100, 'Pending'), (200, 'Processing'), (300, 'Sent'), (400, 'Failed')], db_index=True, default=100, verbose_name='Email Status')), + ('email_sent_at', models.DateTimeField(blank=True, help_text='Timestamp when email was successfully sent.', null=True, verbose_name='Sent At')), + ('item', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='email_alert_load_item', to='alert_system.loaditem', verbose_name='Load Item')), + ('subscription', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='email_alert_subscription', to='notifications.alertsubscription', verbose_name='Alert Subscription')), + ('thread', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='email_alert_thread', to='alert_system.alertemailthread', verbose_name='Email Thread')), + ('user', models.ForeignKey(help_text='The recipient of this alert email.', on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='User')), + ], + options={ + 'verbose_name': 'Email Alert Log', + 'verbose_name_plural': 'Email Alert Logs', + 'ordering': ['-id'], + }, + ), + migrations.AddConstraint( + model_name='loaditem', + constraint=models.UniqueConstraint(fields=('guid',), name='unique_guid'), + ), + migrations.AddIndex( + model_name='alertemailthread', + index=models.Index(fields=['parent_guid', 'user'], name='alert_syste_parent__737a31_idx'), + ), + migrations.AddConstraint( + model_name='alertemailthread', + constraint=models.UniqueConstraint(fields=('parent_guid', 'user'), name='unique_user_guid'), + ), + migrations.AddIndex( + model_name='alertemaillog', + index=models.Index(fields=['user', 'subscription', 'email_sent_at'], name='alert_syste_user_id_06a2ee_idx'), + ), + migrations.AddIndex( + model_name='alertemaillog', + index=models.Index(fields=['user', 'item', 'status'], name='alert_syste_user_id_e51594_idx'), + ), ] diff --git a/alert_system/migrations/0002_alertemailthread_alertemaillog_and_more.py b/alert_system/migrations/0002_alertemailthread_alertemaillog_and_more.py deleted file mode 100644 index 1344f5417..000000000 --- a/alert_system/migrations/0002_alertemailthread_alertemaillog_and_more.py +++ /dev/null @@ -1,63 +0,0 @@ -# Generated by Django 4.2.26 on 2026-01-09 11:16 - -from django.conf import settings -from django.db import migrations, models -import django.db.models.deletion - - -class Migration(migrations.Migration): - - dependencies = [ - ('notifications', '0016_alertsubscription'), - migrations.swappable_dependency(settings.AUTH_USER_MODEL), - ('alert_system', '0001_initial'), - ] - - operations = [ - migrations.CreateModel( - name='AlertEmailThread', - fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('correlation_id', models.CharField(help_text='Identifier linking related LoadItems into the same email thread.', max_length=255)), - ('root_email_message_id', models.CharField(help_text='Message-ID of the first email in this thread.', max_length=255, unique=True)), - ('root_message_sent_at', models.DateTimeField(help_text='Timestamp when the root email was sent.')), - ('created_at', models.DateTimeField(auto_now_add=True)), - ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='alert_email_threads', to=settings.AUTH_USER_MODEL)), - ], - options={ - 'verbose_name': 'Email Thread', - 'verbose_name_plural': 'Email Threads', - 'ordering': ['-id'], - }, - ), - migrations.CreateModel( - name='AlertEmailLog', - fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('message_id', models.CharField(help_text='Unique Message-ID of email for tracking and threading.', max_length=255, unique=True, verbose_name='Message ID')), - ('status', models.IntegerField(choices=[(100, 'Pending'), (200, 'Processing'), (300, 'Sent'), (400, 'Failed')], db_index=True, default=100, verbose_name='Email Status')), - ('email_sent_at', models.DateTimeField(blank=True, help_text='Timestamp when email was successfully sent.', null=True, verbose_name='Sent At')), - ('item', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='email_alert_load_item', to='alert_system.loaditem', verbose_name='Load Item')), - ('subscription', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='email_alert_subscription', to='notifications.alertsubscription', verbose_name='Alert Subscription')), - ('thread', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='email_alert_thread', to='alert_system.alertemailthread', verbose_name='Email Thread')), - ('user', models.ForeignKey(help_text='The recipient of this alert email.', on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='User')), - ], - options={ - 'verbose_name': 'Email Alert Log', - 'verbose_name_plural': 'Email Alert Logs', - 'ordering': ['-id'], - }, - ), - migrations.AddIndex( - model_name='alertemailthread', - index=models.Index(fields=['correlation_id', 'user'], name='alert_syste_correla_2e9c7b_idx'), - ), - migrations.AddIndex( - model_name='alertemaillog', - index=models.Index(fields=['user', 'subscription', 'email_sent_at'], name='alert_syste_user_id_06a2ee_idx'), - ), - migrations.AddIndex( - model_name='alertemaillog', - index=models.Index(fields=['user', 'item', 'status'], name='alert_syste_user_id_e51594_idx'), - ), - ] diff --git a/alert_system/migrations/0003_alter_connector_options.py b/alert_system/migrations/0003_alter_connector_options.py deleted file mode 100644 index 4a8ee6afd..000000000 --- a/alert_system/migrations/0003_alter_connector_options.py +++ /dev/null @@ -1,17 +0,0 @@ -# Generated by Django 4.2.26 on 2026-01-22 11:24 - -from django.db import migrations - - -class Migration(migrations.Migration): - - dependencies = [ - ('alert_system', '0002_alertemailthread_alertemaillog_and_more'), - ] - - operations = [ - migrations.AlterModelOptions( - name='connector', - options={'ordering': ['id', 'type'], 'verbose_name': 'Connector', 'verbose_name_plural': 'Connectors'}, - ), - ] diff --git a/alert_system/models.py b/alert_system/models.py index 52d060f4a..74884eb8d 100644 --- a/alert_system/models.py +++ b/alert_system/models.py @@ -135,6 +135,11 @@ class BaseItem(models.Model): help_text=_("Correlation identifier linking all models"), ) + guid = models.CharField( + verbose_name=_("GUID"), + help_text=_("Globally unique ID for events"), + ) + id: int class Meta: @@ -179,6 +184,11 @@ class LoadItem(BaseItem): Model for Load items. """ + parent_guid = models.CharField( + verbose_name=_("Parent GUID"), + help_text=_("GUID without the episode number."), + ) + # TODO: New id to be used in the future. event_title = models.CharField( max_length=255, @@ -254,6 +264,9 @@ def __str__(self): class Meta: verbose_name = _("Eligible Item") verbose_name_plural = _("Eligible Items") + constraints = [ + models.UniqueConstraint(fields=["guid"], name="unique_guid") + ] # NOTE: GUID should be unique in the load table. class AlertEmailThread(models.Model): @@ -267,8 +280,7 @@ class AlertEmailThread(models.Model): related_name="alert_email_threads", ) - correlation_id = models.CharField( - max_length=255, + parent_guid = models.CharField( help_text=_("Identifier linking related LoadItems into the same email thread."), ) @@ -291,12 +303,13 @@ class Meta: verbose_name = _("Email Thread") verbose_name_plural = _("Email Threads") ordering = ["-id"] + constraints = [models.UniqueConstraint(fields=["parent_guid", "user"], name="unique_user_guid")] indexes = [ - models.Index(fields=["correlation_id", "user"]), + models.Index(fields=["parent_guid", "user"]), ] def __str__(self): - return f"Thread: {self.user.get_full_name()}-{self.correlation_id}" + return f"Thread: {self.user.get_full_name()}-{self.parent_guid}" class AlertEmailLog(models.Model): diff --git a/alert_system/tasks.py b/alert_system/tasks.py index b81b06a82..c4646b318 100644 --- a/alert_system/tasks.py +++ b/alert_system/tasks.py @@ -5,14 +5,12 @@ from celery import chain, group, shared_task from celery.exceptions import MaxRetriesExceededError from django.db import transaction -from django.db.models import Count, Max -from django.utils import timezone +from django.db.models import Max from alert_system.etl.base.extraction import PastEventExtractionClass -from alert_system.utils import get_alert_subscriptions, send_alert_email_notification from api.models import Event -from .models import AlertEmailLog, AlertEmailThread, Connector, LoadItem +from .models import Connector, LoadItem logger = logging.getLogger(__name__) @@ -175,76 +173,3 @@ def process_connector_task(connector_id): return chain( polling_task.s(connector_id), group(fetch_past_events_from_go.s(connector_id), fetch_past_events_from_monty.s()) ).apply_async() - - -# NOTE: Left on the default Celery queue for now; may need a dedicated queue in future. -@shared_task() -def process_email_alert(load_item_id: int) -> None: - load_item = LoadItem.objects.select_related("connector", "connector__dtype").filter(id=load_item_id).first() - - if not load_item: - logger.warning(f"LoadItem with ID [{load_item_id}] not found") - return - - subscriptions = list(get_alert_subscriptions(load_item)) - if not subscriptions: - logger.info(f"No alert subscriptions matched for LoadItem ID [{load_item_id}]") - return - - today = timezone.now().date() - user_ids = [sub.user_id for sub in subscriptions] - subscription_ids = [sub.id for sub in subscriptions] - - # Daily email counts per user - daily_counts = ( - AlertEmailLog.objects.filter( - user_id__in=user_ids, - subscription_id__in=subscription_ids, - status=AlertEmailLog.Status.SENT, - email_sent_at__date=today, - ) - .values("user_id", "subscription_id") - .annotate(sent_count=Count("id")) - ) - daily_count_map = {(item["user_id"], item["subscription_id"]): item["sent_count"] for item in daily_counts} - - # Emails already sent for this item (per user) - already_sent = set( - AlertEmailLog.objects.filter( - user_id__in=user_ids, - subscription_id__in=subscription_ids, - item_id=load_item_id, - status=AlertEmailLog.Status.SENT, - ).values_list("user_id", "subscription_id") - ) - - # Existing threads for this correlation_id - existing_threads = { - thread.user_id: thread - for thread in AlertEmailThread.objects.filter( - correlation_id=load_item.correlation_id, - user_id__in=user_ids, - ) - } - - for subscription in subscriptions: - user = subscription.user - user_id: int = user.id - subscription_id: int = subscription.id - - # Reply if this specific user has an existing thread - thread = existing_threads.get(user_id) - is_reply: bool = thread is not None - - # Skip if daily alert limit reached - sent_today: int = daily_count_map.get((user_id, subscription_id), 0) - if subscription.alert_per_day and sent_today >= subscription.alert_per_day: - logger.info(f"Daily alert limit reached for user [{user.get_full_name()}]") - continue - - # Skip duplicate emails for same item - if (user_id, subscription_id) in already_sent: - logger.info(f"Duplicate alert skipped for user [{user.get_full_name()}] " f"with LoadItem ID [{subscription_id}]") - continue - - send_alert_email_notification(load_item=load_item, user=user, subscription=subscription, thread=thread, is_reply=is_reply) diff --git a/alert_system/tests.py b/alert_system/tests.py index 6919544f1..5077d8cf7 100644 --- a/alert_system/tests.py +++ b/alert_system/tests.py @@ -1,12 +1,11 @@ from unittest import mock from uuid import uuid4 -from django.core.management import call_command from django.test import TestCase from django.utils import timezone +from alert_system.email_processing import process_email_alert from alert_system.models import AlertEmailLog, AlertEmailThread, Connector -from alert_system.tasks import process_email_alert from api.factories.country import CountryFactory from api.factories.disaster_type import DisasterTypeFactory from api.factories.region import RegionFactory @@ -61,7 +60,7 @@ def setUp(self): ) self.eligible_item = LoadItemFactory.create( - correlation_id="corr-001", + parent_guid=str(uuid4()), connector=self.connector, item_eligible=True, is_past_event=False, @@ -93,7 +92,7 @@ def setUp(self): }, ) - @mock.patch("alert_system.utils.send_notification") + @mock.patch("alert_system.email_processing.send_notification") def test_sent_email_for_eligible_item(self, mock_send_notification): process_email_alert(self.eligible_item.id) @@ -108,13 +107,13 @@ def test_sent_email_for_eligible_item(self, mock_send_notification): self.assertIsNotNone(log.email_sent_at) self.assertEqual(thread.user, self.user1) - self.assertEqual(thread.correlation_id, self.eligible_item.correlation_id) + self.assertEqual(thread.parent_guid, self.eligible_item.parent_guid) self.assertEqual(thread.root_email_message_id, log.message_id) self.assertEqual(log.thread, thread) mock_send_notification.assert_called() - @mock.patch("alert_system.utils.send_notification") + @mock.patch("alert_system.email_processing.send_notification") def test_sent_email_to_multiple_users(self, mock_send_notification): process_email_alert(self.eligible_item.id) @@ -122,7 +121,7 @@ def test_sent_email_to_multiple_users(self, mock_send_notification): logs = AlertEmailLog.objects.filter(item=self.eligible_item, status=AlertEmailLog.Status.SENT) self.assertEqual(logs.count(), 2) - threads = AlertEmailThread.objects.filter(correlation_id=self.eligible_item.correlation_id) + threads = AlertEmailThread.objects.filter(parent_guid=self.eligible_item.parent_guid) self.assertEqual(threads.count(), 2) self.assertEqual(mock_send_notification.call_count, 2) @@ -137,7 +136,7 @@ def test_sent_email_to_multiple_users(self, mock_send_notification): self.assertEqual(user2_log.thread, user2_thread) self.assertNotEqual(user1_thread.root_email_message_id, user2_thread.root_email_message_id) - @mock.patch("alert_system.utils.send_notification") + @mock.patch("alert_system.email_processing.send_notification") def test_daily_email_alert_limit(self, mock_send_notification): user = UserFactory.create(email="t@example.com") @@ -200,7 +199,7 @@ def test_daily_email_alert_limit(self, mock_send_notification): # Test Reply emails - @mock.patch("alert_system.utils.send_notification") + @mock.patch("alert_system.email_processing.send_notification") def test_reply_email_for_existing_thread(self, mock_send_notification): user = UserFactory.create() country = CountryFactory.create( @@ -218,7 +217,7 @@ def test_reply_email_for_existing_thread(self, mock_send_notification): ) initial_item = LoadItemFactory.create( - correlation_id=str(uuid4()), + parent_guid=str(uuid4()), connector=self.connector, item_eligible=True, is_past_event=False, @@ -232,7 +231,7 @@ def test_reply_email_for_existing_thread(self, mock_send_notification): thread = AlertEmailThreadFactory.create( user=user, - correlation_id=initial_item.correlation_id, + parent_guid=initial_item.parent_guid, root_email_message_id=str(uuid4()), root_message_sent_at=timezone.now(), ) @@ -248,7 +247,7 @@ def test_reply_email_for_existing_thread(self, mock_send_notification): ) update_item = LoadItemFactory.create( - correlation_id=initial_item.correlation_id, + parent_guid=initial_item.parent_guid, connector=self.connector, item_eligible=True, is_past_event=False, @@ -268,17 +267,17 @@ def test_reply_email_for_existing_thread(self, mock_send_notification): mock_send_notification.assert_called_once() - threads = AlertEmailThread.objects.filter(correlation_id=initial_item.correlation_id) + threads = AlertEmailThread.objects.filter(parent_guid=initial_item.parent_guid) self.assertEqual(threads.count(), 1) - @mock.patch("alert_system.utils.send_notification") + @mock.patch("alert_system.email_processing.send_notification") def test_reply_email_to_multiple_users(self, mock_send_notification): - correlation_id = str(uuid4()) + parent_guid = str(uuid4()) # Create initial item initial_item = LoadItemFactory.create( - correlation_id=correlation_id, + parent_guid=parent_guid, connector=self.connector, item_eligible=True, is_past_event=False, @@ -293,14 +292,14 @@ def test_reply_email_to_multiple_users(self, mock_send_notification): # Create threads for both users thread1 = AlertEmailThreadFactory.create( user=self.user1, - correlation_id=correlation_id, + parent_guid=parent_guid, root_email_message_id="message-id-1", root_message_sent_at=timezone.now(), ) thread2 = AlertEmailThreadFactory.create( user=self.user2, - correlation_id=correlation_id, + parent_guid=parent_guid, root_email_message_id="message-id-2", root_message_sent_at=timezone.now(), ) @@ -326,7 +325,7 @@ def test_reply_email_to_multiple_users(self, mock_send_notification): ) related_item = LoadItemFactory.create( - correlation_id=correlation_id, + parent_guid=parent_guid, connector=self.connector, item_eligible=True, is_past_event=False, @@ -348,10 +347,10 @@ def test_reply_email_to_multiple_users(self, mock_send_notification): reply_user2 = replies.get(user=self.user2) self.assertNotEqual(reply_user1.message_id, reply_user2.message_id) - @mock.patch("alert_system.utils.send_notification") + @mock.patch("alert_system.email_processing.send_notification") def test_duplicate_reply(self, mock_send_notification): - correlation_id = str(uuid4()) + parent_guid = str(uuid4()) user = UserFactory.create() country = CountryFactory.create( @@ -367,7 +366,7 @@ def test_duplicate_reply(self, mock_send_notification): ) LoadItemFactory.create( - correlation_id=correlation_id, + parent_guid=parent_guid, connector=self.connector, item_eligible=True, is_past_event=False, @@ -381,13 +380,13 @@ def test_duplicate_reply(self, mock_send_notification): thread = AlertEmailThreadFactory.create( user=user, - correlation_id=correlation_id, + parent_guid=parent_guid, root_email_message_id="root-123", root_message_sent_at=timezone.now(), ) update_item = LoadItemFactory.create( - correlation_id=correlation_id, + parent_guid=parent_guid, connector=self.connector, item_eligible=True, is_past_event=False, @@ -430,42 +429,3 @@ def test_duplicate_reply(self, mock_send_notification): self.assertEqual(replies_user2.count(), 1) mock_send_notification.assert_called_once() - - # Test command trigger - @mock.patch("alert_system.tasks.process_email_alert.delay") - def test_command_triggers_task_for_eligible_items(self, mock_task_delay): - """Test that management command queues eligible items""" - call_command("alert_notification") - - mock_task_delay.assert_called_once_with(load_item_id=self.eligible_item.id) - - @mock.patch("alert_system.tasks.process_email_alert.delay") - def test_command_for_ineligible_items(self, mock_task_delay): - - # Delete eligible item - self.eligible_item.delete() - - call_command("alert_notification") - - mock_task_delay.assert_not_called() - - @mock.patch("alert_system.tasks.process_email_alert.delay") - def test_command_trigger_for_past_events(self, mock_task_delay): - - self.eligible_item.is_past_event = True - self.eligible_item.save() - - call_command("alert_notification") - - mock_task_delay.assert_not_called() - - @mock.patch("alert_system.utils.send_notification") - def test_email_send_failed(self, mock_send_notification): - - mock_send_notification.side_effect = Exception("Email service error") - - process_email_alert(self.eligible_item.id) - - log = AlertEmailLog.objects.get(user=self.user1, item=self.eligible_item) - self.assertEqual(log.status, AlertEmailLog.Status.FAILED) - self.assertIsNone(log.email_sent_at) diff --git a/alert_system/utils.py b/alert_system/utils.py index e16ba454a..7cdfeb2f9 100644 --- a/alert_system/utils.py +++ b/alert_system/utils.py @@ -1,17 +1,12 @@ import logging -import uuid -from typing import Optional from django.conf import settings from django.contrib.auth.models import User from django.db.models import Q -from django.template.loader import render_to_string -from django.utils import timezone -from alert_system.models import AlertEmailLog, AlertEmailThread, LoadItem +from alert_system.models import LoadItem from api.models import Country from notifications.models import AlertSubscription -from notifications.notification import send_notification logger = logging.getLogger(__name__) @@ -51,73 +46,3 @@ def get_alert_subscriptions(load_item: LoadItem): .select_related("user") .distinct() ) - - -def send_alert_email_notification( - load_item: LoadItem, - user: User, - subscription: AlertSubscription, - thread: Optional[AlertEmailThread], - is_reply: bool = False, -) -> None: - """Helper function to send email and create log entry""" - message_id: str = str(uuid.uuid4()) - - email_log = AlertEmailLog.objects.create( - user=user, - subscription=subscription, - item=load_item, - status=AlertEmailLog.Status.PROCESSING, - message_id=message_id, - thread=thread, - ) - - try: - if is_reply: - subject = f"Re: Hazard Alert: {load_item.event_title}" - template = "email/alert_system/alert_notification_reply.html" - email_type = "Alert Email Notification Reply" - in_reply_to = thread.root_email_message_id - else: - subject = f"New Hazard Alert: {load_item.event_title}" - template = "email/alert_system/alert_notification.html" - email_type = "Alert Email Notification" - in_reply_to = None - - email_context = get_alert_email_context(load_item, user) - email_body = render_to_string(template, email_context) - - send_notification( - subject=subject, - recipients=user.email, - message_id=message_id, - in_reply_to=in_reply_to, - html=email_body, - mailtype=email_type, - ) - - email_log.status = AlertEmailLog.Status.SENT - email_log.email_sent_at = timezone.now() - email_log.save(update_fields=["status", "email_sent_at"]) - - # Create thread for initial emails - if not is_reply: - thread = AlertEmailThread.objects.create( - user=user, - correlation_id=load_item.correlation_id, - root_email_message_id=message_id, - root_message_sent_at=timezone.now(), - ) - email_log.thread = thread - email_log.save(update_fields=["thread"]) - logger.info( - f"Alert Email thread created for user [{user.get_full_name()}] " - f"with correlation_id [{load_item.correlation_id}]" - ) - - logger.info(f"Alert email sent to [{user.get_full_name()}] for LoadItem ID [{load_item.id}]") - - except Exception: - email_log.status = AlertEmailLog.Status.FAILED - email_log.save(update_fields=["status"]) - logger.warning(f"Alert email failed for [{user.get_full_name()}] LoadItem ID [{load_item.id}]", exc_info=True)