Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions alert_system/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class EventAdmin(admin.ModelAdmin):
"stac_id",
"created_at",
"collection",
"correlation_id",
"guid",
)
list_filter = ("connector", "collection")
readonly_fields = ("connector",)
Expand All @@ -31,7 +31,7 @@ class LoadItemAdmin(admin.ModelAdmin):
"id",
"event_title",
"created_at",
"correlation_id",
"guid",
"item_eligible",
"is_past_event",
)
Expand All @@ -56,11 +56,11 @@ class LoadItemAdmin(admin.ModelAdmin):
class AlertEmailThreadAdmin(admin.ModelAdmin):
list_display = (
"user",
"correlation_id",
"parent_guid",
"root_email_message_id",
)
search_fields = (
"correlation_id",
"parent_guid",
"root_email_message_id",
"user__username",
)
Expand Down
155 changes: 155 additions & 0 deletions alert_system/email_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import logging
import uuid
from typing import Optional

from django.contrib.auth.models import User
from django.db.models import Count
from django.template.loader import render_to_string
from django.utils import timezone

from alert_system.models import AlertEmailLog, AlertEmailThread, LoadItem
from alert_system.utils import get_alert_email_context, get_alert_subscriptions
from notifications.models import AlertSubscription
from notifications.notification import send_notification

logger = logging.getLogger(__name__)


def send_alert_email_notification(
load_item: LoadItem,
user: User,
subscription: AlertSubscription,
thread: Optional[AlertEmailThread],
is_reply: bool = False,
) -> None:
"""Helper function to send email and create log entry"""
message_id: str = str(uuid.uuid4())

email_log = AlertEmailLog.objects.create(
user=user,
subscription=subscription,
item=load_item,
status=AlertEmailLog.Status.PROCESSING,
message_id=message_id,
thread=thread,
)

try:
if is_reply:
subject = f"Re: Hazard Alert: {load_item.event_title}"
template = "email/alert_system/alert_notification_reply.html"
email_type = "Alert Email Notification Reply"
in_reply_to = thread.root_email_message_id
else:
subject = f"New Hazard Alert: {load_item.event_title}"
template = "email/alert_system/alert_notification.html"
email_type = "Alert Email Notification"
in_reply_to = None

email_context = get_alert_email_context(load_item, user)
email_body = render_to_string(template, email_context)

send_notification(
subject=subject,
recipients=user.email,
message_id=message_id,
in_reply_to=in_reply_to,
html=email_body,
mailtype=email_type,
)

email_log.status = AlertEmailLog.Status.SENT
email_log.email_sent_at = timezone.now()
email_log.save(update_fields=["status", "email_sent_at"])

# Create thread for initial emails
if not is_reply:
thread = AlertEmailThread.objects.create(
user=user,
parent_guid=load_item.parent_guid,
root_email_message_id=message_id,
root_message_sent_at=timezone.now(),
)
email_log.thread = thread
email_log.save(update_fields=["thread"])
logger.info(
f"Alert Email thread created for user [{user.get_full_name()}] " f"with parent_guid [{load_item.parent_guid}]"
)

logger.info(f"Alert email sent to [{user.get_full_name()}] for LoadItem ID [{load_item.id}]")

except Exception:
email_log.status = AlertEmailLog.Status.FAILED
email_log.save(update_fields=["status"])
logger.warning(f"Alert email failed for [{user.get_full_name()}] LoadItem ID [{load_item.id}]", exc_info=True)


def process_email_alert(load_item_id: int) -> None:
load_item = LoadItem.objects.select_related("connector", "connector__dtype").filter(id=load_item_id).first()

if not load_item:
logger.warning(f"LoadItem with ID [{load_item_id}] not found")
return

subscriptions = list(get_alert_subscriptions(load_item))
if not subscriptions:
logger.info(f"No alert subscriptions matched for LoadItem ID [{load_item_id}]")
return

today = timezone.now().date()
user_ids = [sub.user_id for sub in subscriptions]
subscription_ids = [sub.id for sub in subscriptions]

# Daily email counts per user
daily_counts = (
AlertEmailLog.objects.filter(
user_id__in=user_ids,
subscription_id__in=subscription_ids,
status=AlertEmailLog.Status.SENT,
email_sent_at__date=today,
)
.values("user_id", "subscription_id")
.annotate(sent_count=Count("id"))
)
daily_count_map = {(item["user_id"], item["subscription_id"]): item["sent_count"] for item in daily_counts}

# Emails already sent for this item (per user)
already_sent = set(
AlertEmailLog.objects.filter(
user_id__in=user_ids,
subscription_id__in=subscription_ids,
item_id=load_item_id,
status=AlertEmailLog.Status.SENT,
).values_list("user_id", "subscription_id")
)

# Existing threads for this correlation_id
existing_threads = {
thread.user_id: thread
for thread in AlertEmailThread.objects.filter(
parent_guid=load_item.parent_guid,
user_id__in=user_ids,
)
}

for subscription in subscriptions:
user = subscription.user
user_id: int = user.id
subscription_id: int = subscription.id

# Reply if this specific user has an existing thread
thread = existing_threads.get(user_id)
is_reply: bool = thread is not None

# Skip if daily alert limit reached
sent_today: int = daily_count_map.get((user_id, subscription_id), 0)
if subscription.alert_per_day and sent_today >= subscription.alert_per_day:
logger.info(f"Daily alert limit reached for user [{user.get_full_name()}]")
continue

# Skip duplicate emails for same item
if (user_id, subscription_id) in already_sent:
logger.info(f"Duplicate alert skipped for user [{user.get_full_name()}] " f"with LoadItem ID [{subscription_id}]")
continue

send_alert_email_notification(load_item=load_item, user=user, subscription=subscription, thread=thread, is_reply=is_reply)
1 change: 1 addition & 0 deletions alert_system/etl/base/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class ExtractionConfig(TypedDict):
filter_impact: Dict | None

people_exposed_threshold: int
forecasted_data: bool
80 changes: 32 additions & 48 deletions alert_system/etl/base/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class BaseExtractionClass(ABC):
filter_event: Optional[Dict] = None
filter_hazard: Optional[Dict] = None
filter_impact: Optional[Dict] = None
forecasted_data: bool | None = False

config: ExtractionConfig

Expand Down Expand Up @@ -86,13 +87,16 @@ def fetch_stac_data(url: str, filters: Optional[Dict] = None, timeout: int | Non
current_payload = None # Only use params on first request

def _get_correlation_id(self, feature: Dict) -> str:
"""Extract correlation ID from feature properties."""
return feature.get("properties", {}).get("monty:corr_id")

def _get_guid(self, feature: Dict) -> str:
return feature.get("properties", {}).get("monty:guid")

def _build_base_defaults(self, feature: Dict, run_id: str, collection_type: ExtractionItem.CollectionType) -> Dict:
"""Build common default fields for all STAC items."""
return {
"correlation_id": self._get_correlation_id(feature),
"guid": self._get_guid(feature=feature),
"correlation_id": self._get_correlation_id(feature=feature),
"resp_data": feature,
"connector": self.connector,
"extraction_run_id": run_id,
Expand Down Expand Up @@ -138,7 +142,8 @@ def _extract_impact_items(self, stac_obj: ExtractionItem, run_id: str) -> List[E
self.base_url,
build_stac_search(
collections=self.impact_collection_type,
correlation_id=stac_obj.correlation_id,
guid=stac_obj.guid,
forecasted_data=self.forecasted_data,
),
)
except Exception as e:
Expand Down Expand Up @@ -168,7 +173,7 @@ def _extract_hazard_items(self, stac_obj: ExtractionItem, run_id: str) -> Extrac
self.base_url,
build_stac_search(
collections=self.hazard_collection_type,
correlation_id=stac_obj.correlation_id,
guid=stac_obj.guid,
),
)
except Exception as e:
Expand All @@ -189,7 +194,7 @@ def _extract_hazard_items(self, stac_obj: ExtractionItem, run_id: str) -> Extrac
hazard_obj = self._save_stac_item(hazard_id, defaults)
return hazard_obj

def process_event_items(self, extraction_run_id: str, correlation_id: str | None = None, is_past_event: bool = False) -> None:
def process_event_items(self, extraction_run_id: str, guid: str | None = None, is_past_event: bool = False) -> None:
"""Process all event items from the connector source."""
filters = []
if self.filter_event:
Expand All @@ -205,7 +210,7 @@ def process_event_items(self, extraction_run_id: str, correlation_id: str | None
build_stac_search(
collections=self.event_collection_type,
additional_filters=filters,
correlation_id=correlation_id,
guid=guid,
datetime_range=None if is_past_event else self.get_datetime_filter(),
),
)
Expand Down Expand Up @@ -246,10 +251,10 @@ def process_event_items(self, extraction_run_id: str, correlation_id: str | None
logger.warning(f"Failed to process event {event_id}: {e}", exc_info=True)
raise

def run(self, extraction_run_id: str, correlation_id: str | None = None, is_past_event: bool = False) -> None:
def run(self, extraction_run_id: str, guid: str | None = None, is_past_event: bool = False) -> None:
"""Main entry point for running the connector."""
try:
self.process_event_items(extraction_run_id, correlation_id, is_past_event)
self.process_event_items(extraction_run_id, guid, is_past_event)
except Exception as e:
logger.warning(f"Connector run failed: {e}", exc_info=True)
raise
Expand Down Expand Up @@ -280,24 +285,19 @@ def _country_filter(self, country_codes) -> list[str]:
filters.append(f"({country_cql})")
return filters

def _hazard_filter(self, unit: str | None, value: int | None) -> Optional[str]:
if not unit or value is None:
return None
return f"monty:hazard_detail.severity_unit = '{unit}' AND " f"monty:hazard_detail.severity_value >= {value}"

def _collect_corr_ids(self, features, exclude: str) -> set[str]:
corr_ids = set()
def _collect_guids(self, features, exclude: str) -> set[str]:
guids = set()
for feature in features or []:
corr_id = self.extractor._get_correlation_id(feature)
if corr_id and corr_id != exclude:
corr_ids.add(corr_id)
return corr_ids
guid = self.extractor._get_guid(feature)
if guid and guid != exclude:
guids.add(guid)
return guids

def find_related_corr_ids(self, load_obj: LoadItem) -> set[str]:
def find_related_guids(self, load_obj: LoadItem) -> set[str]:
start = timezone.now() - timedelta(weeks=self.extractor.connector.lookback_weeks)
end = timezone.now()

corr_ids = set()
guids = set()

if self.extractor.impact_collection_type:
impact_filter = self._impact_filter(load_obj.impact_metadata)
Expand All @@ -316,50 +316,34 @@ def find_related_corr_ids(self, load_obj: LoadItem) -> set[str]:
collections=self.extractor.impact_collection_type,
additional_filters=additional_filters,
datetime_range=f"{start.isoformat()}/{end.isoformat()}",
forecasted_data=self.extractor.forecasted_data,
),
)

corr_ids |= self._collect_corr_ids(features, load_obj.correlation_id)

# NOTE: Returns too many correlation_ids.
# if self.extractor.hazard_collection_type:
# hazard_filter = self._hazard_filter(
# load_obj.severity_unit,
# load_obj.severity_value,
# )
# features = self.extractor.fetch_stac_data(
# self.base_url,
# build_stac_search(
# collections=self.extractor.hazard_collection_type,
# additional_filters=[hazard_filter],
# datetime_range=f"{start.isoformat()}/{end.isoformat()}",
# ),
# )
# corr_ids |= self._collect_corr_ids(features, load_obj.correlation_id)

return corr_ids
guids |= self._collect_guids(features, load_obj.guid)
return guids

def extract_past_events(self, load_obj: LoadItem) -> None:
corr_ids = self.find_related_corr_ids(load_obj)
guids = self.find_related_guids(load_obj)

if not corr_ids:
if not guids:
return

existing_items = LoadItem.objects.filter(correlation_id__in=corr_ids)
existing_map = {i.correlation_id: i for i in existing_items}
existing_items = LoadItem.objects.filter(guid__in=guids)
existing_map = {i.guid: i for i in existing_items}

related_ids = []

for corr_id in corr_ids:
item = existing_map.get(corr_id)
for guid in guids:
item = existing_map.get(guid)

if not item:
self.extractor.run(
extraction_run_id=load_obj.extraction_run_id,
correlation_id=corr_id,
guid=guid,
is_past_event=True,
)
item = LoadItem.objects.filter(correlation_id=corr_id).first()
item = LoadItem.objects.filter(guid=guid).first()

if item:
related_ids.append(item.id)
Expand Down
Loading
Loading