Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,16 @@ LOGGING = {
```

## Release Notes

- 4.1.7
- `logzio/sender.py`:
- Added `Lock` import and `_flush_lock` instance variable
- Modified `_flush_queue()` to use lock for thread-safe flushing
- Modified `_get_messages_up_to_max_allowed_size()` to use non-blocking `queue.get(block=False)`
- `tests/test_thread_safety.py`: Added comprehensive tests for thread safety
- 4.1.6
- Update dependencies
- 4.1.5
- Update dependencies
- 4.1.4
- Upgrade trace context opentelemetry-instrumentation-logging package version from v0.39b0 to v4.50b0
- 4.1.3
Expand Down
139 changes: 72 additions & 67 deletions logzio/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
from datetime import datetime
from importlib.metadata import version
from threading import Thread, enumerate
from threading import Thread, Lock, enumerate
from time import sleep

import requests
Expand Down Expand Up @@ -54,6 +54,7 @@ def __init__(self,

# Create a queue to hold logs
self.queue = queue.Queue()
self._flush_lock = Lock()
self._initialize_sending_thread()

def __del__(self):
Expand Down Expand Up @@ -100,79 +101,83 @@ def _drain_queue(self):
sleep(self.logs_drain_timeout)

def _flush_queue(self):
# Sending logs until queue is empty
while not self.queue.empty():
logs_list = self._get_messages_up_to_max_allowed_size()
self.stdout_logger.debug(
'Starting to drain %s logs to Logz.io', len(logs_list))

# Not configurable from the outside
sleep_between_retries = self.retry_timeout
self.number_of_retries = self.number_of_retries

should_backup_to_disk = True
headers = {"Content-type": "text/plain", **SHIPPER_HEADER}

for current_try in range(self.number_of_retries):
should_retry = False
try:
response = self.requests_session.post(
self.url, headers=headers, data='\n'.join(logs_list),
timeout=self.network_timeout)
if response.status_code != 200:
if response.status_code == 400:
self.stdout_logger.info(
'Got 400 code from Logz.io. This means that '
'some of your logs are too big, or badly '
'formatted. response: %s', response.text)
with self._flush_lock:
while not self.queue.empty():
logs_list = self._get_messages_up_to_max_allowed_size()
if not logs_list:
break
self.stdout_logger.debug(
'Starting to drain %s logs to Logz.io', len(logs_list))

sleep_between_retries = self.retry_timeout
self.number_of_retries = self.number_of_retries

should_backup_to_disk = True
headers = {"Content-type": "text/plain", **SHIPPER_HEADER}

for current_try in range(self.number_of_retries):
should_retry = False
try:
response = self.requests_session.post(
self.url, headers=headers, data='\n'.join(logs_list),
timeout=self.network_timeout)
if response.status_code != 200:
if response.status_code == 400:
self.stdout_logger.info(
'Got 400 code from Logz.io. This means that '
'some of your logs are too big, or badly '
'formatted. response: %s', response.text)
should_backup_to_disk = False
break

if response.status_code == 401:
self.stdout_logger.info(
'You are not authorized with Logz.io! Token '
'OK? dropping logs...')
should_backup_to_disk = False
break
else:
self.stdout_logger.info(
'Got %s while sending logs to Logz.io, '
'Try (%s/%s). Response: %s',
response.status_code,
current_try + 1,
self.number_of_retries,
response.text)
should_retry = True
else:
self.stdout_logger.debug(
'Successfully sent bulk of %s logs to '
'Logz.io!', len(logs_list))
should_backup_to_disk = False
break
except Exception as e:
self.stdout_logger.warning(
'Got exception while sending logs to Logz.io, '
'Try (%s/%s). Message: %s',
current_try + 1, self.number_of_retries, e)
should_retry = True

if response.status_code == 401:
self.stdout_logger.info(
'You are not authorized with Logz.io! Token '
'OK? dropping logs...')
should_backup_to_disk = False
break
else:
self.stdout_logger.info(
'Got %s while sending logs to Logz.io, '
'Try (%s/%s). Response: %s',
response.status_code,
current_try + 1,
self.number_of_retries,
response.text)
should_retry = True
else:
self.stdout_logger.debug(
'Successfully sent bulk of %s logs to '
'Logz.io!', len(logs_list))
should_backup_to_disk = False
break
except Exception as e:
self.stdout_logger.warning(
'Got exception while sending logs to Logz.io, '
'Try (%s/%s). Message: %s',
current_try + 1, self.number_of_retries, e)
should_retry = True

if should_retry:
sleep(sleep_between_retries)

if should_backup_to_disk and self.backup_logs:
# Write to file
self.stdout_logger.error(
'Could not send logs to Logz.io after %s tries, '
'backing up to local file system', self.number_of_retries)
backup_logs(logs_list, self.stdout_logger)

del logs_list
if should_retry:
sleep(sleep_between_retries)

if should_backup_to_disk and self.backup_logs:
self.stdout_logger.error(
'Could not send logs to Logz.io after %s tries, '
'backing up to local file system', self.number_of_retries)
backup_logs(logs_list, self.stdout_logger)

del logs_list

def _get_messages_up_to_max_allowed_size(self):
logs_list = []
current_size = 0
while not self.queue.empty():
current_log = self.queue.get()
while True:
try:
current_log = self.queue.get(block=False)
except queue.Empty:
break

try:
current_size += sys.getsizeof(current_log)
except TypeError:
Expand Down
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
description="Logging handler to send logs to your Logz.io account with bulk SSL",
keywords="logging handler logz.io bulk https",
author="roiravhon",
maintainer="tamir-michaeli",
mail="tamir.michaeli@logz.io",
mail="help@logz.io",
author_email="help@logz.io",
maintainer="yotam-loewenbach",
maintainer_email="yotam.loewenbach@logz.io",
url="https://github.com/logzio/logzio-python-handler/",
license="Apache License 2",
packages=find_packages(),
Expand Down
150 changes: 150 additions & 0 deletions tests/test_thread_safety.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import threading
import time
from unittest import TestCase
from unittest.mock import patch, MagicMock

from logzio.sender import LogzioSender


class TestThreadSafety(TestCase):
"""Tests for thread-safety fixes in LogzioSender.

These tests verify that:
1. queue.get() doesn't block indefinitely when queue becomes empty
2. Concurrent flush calls from multiple threads work correctly
3. The flush lock prevents race conditions
"""

@patch('logzio.sender.requests.Session')
def setUp(self, mock_session):
mock_response = MagicMock()
mock_response.status_code = 200
mock_session.return_value.post.return_value = mock_response

self.sender = LogzioSender(
token='test-token',

Check failure on line 25 in tests/test_thread_safety.py

View check run for this annotation

Cycode Security / Cycode: Secrets

tests/test_thread_safety.py#L25

Generic Password found
url='http://localhost:8080',
logs_drain_timeout=1,
debug=False,
backup_logs=False
)
self.sender.sending_thread.join(timeout=0.1)

def test_get_messages_does_not_block_on_empty_queue(self):
"""Test that _get_messages_up_to_max_allowed_size returns immediately on empty queue."""
while not self.sender.queue.empty():
self.sender.queue.get(block=False)

start_time = time.time()
result = self.sender._get_messages_up_to_max_allowed_size()
elapsed = time.time() - start_time

self.assertEqual(result, [])
self.assertLess(elapsed, 0.1, "Method should return immediately, not block")

def test_get_messages_returns_available_messages(self):
"""Test that _get_messages_up_to_max_allowed_size returns queued messages."""
self.sender.queue.put('{"message": "test1"}')
self.sender.queue.put('{"message": "test2"}')
self.sender.queue.put('{"message": "test3"}')

result = self.sender._get_messages_up_to_max_allowed_size()

self.assertEqual(len(result), 3)
self.assertIn('{"message": "test1"}', result)

@patch('logzio.sender.requests.Session')
def test_concurrent_flush_calls_are_thread_safe(self, mock_session):
"""Test that multiple threads can call flush() without race conditions."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_session.return_value.post.return_value = mock_response

for i in range(100):
self.sender.queue.put(f'{{"message": "test{i}"}}')

errors = []

def flush_worker():
try:
self.sender.flush()
except Exception as e:
errors.append(e)

threads = [threading.Thread(target=flush_worker) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)

self.assertEqual(errors, [], f"Flush raised errors: {errors}")
self.assertTrue(self.sender.queue.empty(), "Queue should be empty after flush")

@patch('logzio.sender.requests.Session')
def test_flush_completes_without_blocking(self, mock_session):
"""Test that flush() completes in reasonable time even with concurrent access."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_session.return_value.post.return_value = mock_response

self.sender.queue.put('{"message": "test"}')

start_time = time.time()
self.sender.flush()
elapsed = time.time() - start_time

self.assertLess(elapsed, 5, "Flush should complete without blocking indefinitely")

def test_flush_lock_exists(self):
"""Test that the sender has a flush lock for thread synchronization."""
self.assertTrue(hasattr(self.sender, '_flush_lock'))
self.assertIsInstance(self.sender._flush_lock, type(threading.Lock()))

@patch('logzio.sender.requests.Session')
def test_race_condition_empty_check_then_get(self, mock_session):
"""Test the specific race condition: empty() returns False but get() would block.

This simulates the scenario where:
1. Thread A checks queue.empty() -> False
2. Thread B consumes the last item
3. Thread A calls queue.get() -> would block forever without fix
"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_session.return_value.post.return_value = mock_response

results = {'completed': False, 'error': None}

def consumer():
try:
time.sleep(0.05)
while not self.sender.queue.empty():
try:
self.sender.queue.get(block=False)
except:
pass
except Exception as e:
results['error'] = e

def flusher():
try:
self.sender.flush()
results['completed'] = True
except Exception as e:
results['error'] = e

self.sender.queue.put('{"message": "test"}')

consumer_thread = threading.Thread(target=consumer)
flusher_thread = threading.Thread(target=flusher)

consumer_thread.start()
flusher_thread.start()

flusher_thread.join(timeout=5)
consumer_thread.join(timeout=1)

self.assertFalse(flusher_thread.is_alive(),
"Flush should complete, not block indefinitely")
self.assertIsNone(results['error'], f"Error occurred: {results['error']}")