Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions posthog/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import time
from threading import Thread

import backoff

from posthog.request import APIError, DatetimeSerializer, batch_post

try:
Expand Down Expand Up @@ -128,29 +126,41 @@ def next(self):
def request(self, batch):
"""Attempt to upload the batch and retry before raising an error"""

def fatal_exception(exc):
def is_retryable(exc):
if isinstance(exc, APIError):
# retry on server errors and client errors
# with 429 status code (rate limited),
# with 408 (request timeout) or 429 (rate limited),
# don't retry on other client errors
if exc.status == "N/A":
return False
return (400 <= exc.status < 500) and exc.status != 429
return not ((400 <= exc.status < 500) and exc.status not in (408, 429))
else:
# retry on all other errors (eg. network)
return False

@backoff.on_exception(
backoff.expo, Exception, max_tries=self.retries + 1, giveup=fatal_exception
)
def send_request():
batch_post(
self.api_key,
self.host,
gzip=self.gzip,
timeout=self.timeout,
batch=batch,
historical_migration=self.historical_migration,
)

send_request()
return True

last_exc = None
for attempt in range(self.retries + 1):
try:
batch_post(
self.api_key,
self.host,
gzip=self.gzip,
timeout=self.timeout,
batch=batch,
historical_migration=self.historical_migration,
)
return
except Exception as e:
last_exc = e
if not is_retryable(e):
raise
if attempt < self.retries:
# Respect Retry-After header if present, otherwise use exponential backoff
retry_after = getattr(e, "retry_after", None)
if retry_after and retry_after > 0:
time.sleep(retry_after)
else:
time.sleep(min(2**attempt, 30))

if last_exc:
raise last_exc
29 changes: 26 additions & 3 deletions posthog/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,32 @@ def _process_response(
)
raise QuotaLimitError(res.status_code, "Feature flags quota limited")
return response
retry_after = None
retry_after_header = res.headers.get("Retry-After")
if retry_after_header:
try:
retry_after = float(retry_after_header)
except (ValueError, TypeError):
try:
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime

retry_after = max(
0.0,
(
parsedate_to_datetime(retry_after_header)
- datetime.now(timezone.utc)
).total_seconds(),
)
except (ValueError, TypeError):
pass

try:
payload = res.json()
log.debug("received response: %s", payload)
raise APIError(res.status_code, payload["detail"])
raise APIError(res.status_code, payload["detail"], retry_after=retry_after)
except (KeyError, ValueError):
raise APIError(res.status_code, res.text)
raise APIError(res.status_code, res.text, retry_after=retry_after)


def decide(
Expand Down Expand Up @@ -348,9 +368,12 @@ def get(


class APIError(Exception):
def __init__(self, status: Union[int, str], message: str):
def __init__(
self, status: Union[int, str], message: str, retry_after: Optional[float] = None
):
self.message = message
self.status = status
self.retry_after = retry_after

def __str__(self):
msg = "[PostHog] {0} ({1})"
Expand Down
57 changes: 57 additions & 0 deletions posthog/test/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,63 @@ def mock_post_fn(_: str, data: str, **kwargs: Any) -> mock.Mock:
q.join()
self.assertEqual(mock_post.call_count, 2)

def test_request_sleeps_with_retry_after(self) -> None:
error = APIError(429, "Too Many Requests", retry_after=5.0)
call_count = [0]

def mock_post(*args: Any, **kwargs: Any) -> None:
call_count[0] += 1
if call_count[0] <= 1:
raise error

consumer = Consumer(None, TEST_API_KEY, retries=3)
with (
mock.patch("posthog.consumer.batch_post", side_effect=mock_post),
mock.patch("posthog.consumer.time.sleep") as mock_sleep,
):
consumer.request([_track_event()])
mock_sleep.assert_called_once_with(5.0)

def test_request_uses_exponential_backoff_without_retry_after(self) -> None:
error = APIError(503, "Service Unavailable")
call_count = [0]

def mock_post(*args: Any, **kwargs: Any) -> None:
call_count[0] += 1
if call_count[0] <= 3:
raise error

consumer = Consumer(None, TEST_API_KEY, retries=3)
with (
mock.patch("posthog.consumer.batch_post", side_effect=mock_post),
mock.patch("posthog.consumer.time.sleep") as mock_sleep,
):
consumer.request([_track_event()])
self.assertEqual(
mock_sleep.call_args_list,
[
mock.call(1), # 2^0
mock.call(2), # 2^1
mock.call(4), # 2^2
],
)

def test_request_retries_on_408(self) -> None:
call_count = [0]

def mock_post(*args: Any, **kwargs: Any) -> None:
call_count[0] += 1
if call_count[0] <= 1:
raise APIError(408, "Request Timeout")

consumer = Consumer(None, TEST_API_KEY, retries=3)
with (
mock.patch("posthog.consumer.batch_post", side_effect=mock_post),
mock.patch("posthog.consumer.time.sleep"),
):
consumer.request([_track_event()])
self.assertEqual(call_count[0], 2)

@parameterized.expand(
[
("on_error_succeeds", False),
Expand Down
Loading