diff --git a/posthog/consumer.py b/posthog/consumer.py index 021d4343..d4ce068e 100644 --- a/posthog/consumer.py +++ b/posthog/consumer.py @@ -3,8 +3,6 @@ import time from threading import Thread -import backoff - from posthog.request import APIError, DatetimeSerializer, batch_post try: @@ -128,29 +126,41 @@ def next(self): def request(self, batch): """Attempt to upload the batch and retry before raising an error""" - def fatal_exception(exc): + def is_retryable(exc): if isinstance(exc, APIError): # retry on server errors and client errors - # with 429 status code (rate limited), + # with 408 (request timeout) or 429 (rate limited), # don't retry on other client errors if exc.status == "N/A": return False - return (400 <= exc.status < 500) and exc.status != 429 + return not ((400 <= exc.status < 500) and exc.status not in (408, 429)) else: # retry on all other errors (eg. network) - return False - - @backoff.on_exception( - backoff.expo, Exception, max_tries=self.retries + 1, giveup=fatal_exception - ) - def send_request(): - batch_post( - self.api_key, - self.host, - gzip=self.gzip, - timeout=self.timeout, - batch=batch, - historical_migration=self.historical_migration, - ) - - send_request() + return True + + last_exc = None + for attempt in range(self.retries + 1): + try: + batch_post( + self.api_key, + self.host, + gzip=self.gzip, + timeout=self.timeout, + batch=batch, + historical_migration=self.historical_migration, + ) + return + except Exception as e: + last_exc = e + if not is_retryable(e): + raise + if attempt < self.retries: + # Respect Retry-After header if present, otherwise use exponential backoff + retry_after = getattr(e, "retry_after", None) + if retry_after and retry_after > 0: + time.sleep(retry_after) + else: + time.sleep(min(2**attempt, 30)) + + if last_exc: + raise last_exc diff --git a/posthog/request.py b/posthog/request.py index 08211fe5..2dfe0a18 100644 --- a/posthog/request.py +++ b/posthog/request.py @@ -235,12 +235,32 @@ def _process_response( ) raise QuotaLimitError(res.status_code, "Feature flags quota limited") return response + retry_after = None + retry_after_header = res.headers.get("Retry-After") + if retry_after_header: + try: + retry_after = float(retry_after_header) + except (ValueError, TypeError): + try: + from datetime import datetime, timezone + from email.utils import parsedate_to_datetime + + retry_after = max( + 0.0, + ( + parsedate_to_datetime(retry_after_header) + - datetime.now(timezone.utc) + ).total_seconds(), + ) + except (ValueError, TypeError): + pass + try: payload = res.json() log.debug("received response: %s", payload) - raise APIError(res.status_code, payload["detail"]) + raise APIError(res.status_code, payload["detail"], retry_after=retry_after) except (KeyError, ValueError): - raise APIError(res.status_code, res.text) + raise APIError(res.status_code, res.text, retry_after=retry_after) def decide( @@ -348,9 +368,12 @@ def get( class APIError(Exception): - def __init__(self, status: Union[int, str], message: str): + def __init__( + self, status: Union[int, str], message: str, retry_after: Optional[float] = None + ): self.message = message self.status = status + self.retry_after = retry_after def __str__(self): msg = "[PostHog] {0} ({1})" diff --git a/posthog/test/test_consumer.py b/posthog/test/test_consumer.py index 28d51221..a832aa28 100644 --- a/posthog/test/test_consumer.py +++ b/posthog/test/test_consumer.py @@ -167,6 +167,63 @@ def mock_post_fn(_: str, data: str, **kwargs: Any) -> mock.Mock: q.join() self.assertEqual(mock_post.call_count, 2) + def test_request_sleeps_with_retry_after(self) -> None: + error = APIError(429, "Too Many Requests", retry_after=5.0) + call_count = [0] + + def mock_post(*args: Any, **kwargs: Any) -> None: + call_count[0] += 1 + if call_count[0] <= 1: + raise error + + consumer = Consumer(None, TEST_API_KEY, retries=3) + with ( + mock.patch("posthog.consumer.batch_post", side_effect=mock_post), + mock.patch("posthog.consumer.time.sleep") as mock_sleep, + ): + consumer.request([_track_event()]) + mock_sleep.assert_called_once_with(5.0) + + def test_request_uses_exponential_backoff_without_retry_after(self) -> None: + error = APIError(503, "Service Unavailable") + call_count = [0] + + def mock_post(*args: Any, **kwargs: Any) -> None: + call_count[0] += 1 + if call_count[0] <= 3: + raise error + + consumer = Consumer(None, TEST_API_KEY, retries=3) + with ( + mock.patch("posthog.consumer.batch_post", side_effect=mock_post), + mock.patch("posthog.consumer.time.sleep") as mock_sleep, + ): + consumer.request([_track_event()]) + self.assertEqual( + mock_sleep.call_args_list, + [ + mock.call(1), # 2^0 + mock.call(2), # 2^1 + mock.call(4), # 2^2 + ], + ) + + def test_request_retries_on_408(self) -> None: + call_count = [0] + + def mock_post(*args: Any, **kwargs: Any) -> None: + call_count[0] += 1 + if call_count[0] <= 1: + raise APIError(408, "Request Timeout") + + consumer = Consumer(None, TEST_API_KEY, retries=3) + with ( + mock.patch("posthog.consumer.batch_post", side_effect=mock_post), + mock.patch("posthog.consumer.time.sleep"), + ): + consumer.request([_track_event()]) + self.assertEqual(call_count[0], 2) + @parameterized.expand( [ ("on_error_succeeds", False),