From e8fabea69d057a1c8fe3ff1d8ed30bbd96df6f01 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Sun, 25 Jan 2026 18:30:44 +0700 Subject: [PATCH 1/5] Refactor LogzioSender to use a lock for thread-safe log flushing --- logzio/sender.py | 139 ++++++++++++++++++++++++----------------------- 1 file changed, 72 insertions(+), 67 deletions(-) diff --git a/logzio/sender.py b/logzio/sender.py index 21596d6..12d5fd8 100644 --- a/logzio/sender.py +++ b/logzio/sender.py @@ -4,7 +4,7 @@ import sys from datetime import datetime from importlib.metadata import version -from threading import Thread, enumerate +from threading import Thread, Lock, enumerate from time import sleep import requests @@ -54,6 +54,7 @@ def __init__(self, # Create a queue to hold logs self.queue = queue.Queue() + self._flush_lock = Lock() self._initialize_sending_thread() def __del__(self): @@ -100,79 +101,83 @@ def _drain_queue(self): sleep(self.logs_drain_timeout) def _flush_queue(self): - # Sending logs until queue is empty - while not self.queue.empty(): - logs_list = self._get_messages_up_to_max_allowed_size() - self.stdout_logger.debug( - 'Starting to drain %s logs to Logz.io', len(logs_list)) - - # Not configurable from the outside - sleep_between_retries = self.retry_timeout - self.number_of_retries = self.number_of_retries - - should_backup_to_disk = True - headers = {"Content-type": "text/plain", **SHIPPER_HEADER} - - for current_try in range(self.number_of_retries): - should_retry = False - try: - response = self.requests_session.post( - self.url, headers=headers, data='\n'.join(logs_list), - timeout=self.network_timeout) - if response.status_code != 200: - if response.status_code == 400: - self.stdout_logger.info( - 'Got 400 code from Logz.io. This means that ' - 'some of your logs are too big, or badly ' - 'formatted. response: %s', response.text) + with self._flush_lock: + while not self.queue.empty(): + logs_list = self._get_messages_up_to_max_allowed_size() + if not logs_list: + break + self.stdout_logger.debug( + 'Starting to drain %s logs to Logz.io', len(logs_list)) + + sleep_between_retries = self.retry_timeout + self.number_of_retries = self.number_of_retries + + should_backup_to_disk = True + headers = {"Content-type": "text/plain", **SHIPPER_HEADER} + + for current_try in range(self.number_of_retries): + should_retry = False + try: + response = self.requests_session.post( + self.url, headers=headers, data='\n'.join(logs_list), + timeout=self.network_timeout) + if response.status_code != 200: + if response.status_code == 400: + self.stdout_logger.info( + 'Got 400 code from Logz.io. This means that ' + 'some of your logs are too big, or badly ' + 'formatted. response: %s', response.text) + should_backup_to_disk = False + break + + if response.status_code == 401: + self.stdout_logger.info( + 'You are not authorized with Logz.io! Token ' + 'OK? dropping logs...') + should_backup_to_disk = False + break + else: + self.stdout_logger.info( + 'Got %s while sending logs to Logz.io, ' + 'Try (%s/%s). Response: %s', + response.status_code, + current_try + 1, + self.number_of_retries, + response.text) + should_retry = True + else: + self.stdout_logger.debug( + 'Successfully sent bulk of %s logs to ' + 'Logz.io!', len(logs_list)) should_backup_to_disk = False break + except Exception as e: + self.stdout_logger.warning( + 'Got exception while sending logs to Logz.io, ' + 'Try (%s/%s). Message: %s', + current_try + 1, self.number_of_retries, e) + should_retry = True - if response.status_code == 401: - self.stdout_logger.info( - 'You are not authorized with Logz.io! Token ' - 'OK? dropping logs...') - should_backup_to_disk = False - break - else: - self.stdout_logger.info( - 'Got %s while sending logs to Logz.io, ' - 'Try (%s/%s). Response: %s', - response.status_code, - current_try + 1, - self.number_of_retries, - response.text) - should_retry = True - else: - self.stdout_logger.debug( - 'Successfully sent bulk of %s logs to ' - 'Logz.io!', len(logs_list)) - should_backup_to_disk = False - break - except Exception as e: - self.stdout_logger.warning( - 'Got exception while sending logs to Logz.io, ' - 'Try (%s/%s). Message: %s', - current_try + 1, self.number_of_retries, e) - should_retry = True - - if should_retry: - sleep(sleep_between_retries) - - if should_backup_to_disk and self.backup_logs: - # Write to file - self.stdout_logger.error( - 'Could not send logs to Logz.io after %s tries, ' - 'backing up to local file system', self.number_of_retries) - backup_logs(logs_list, self.stdout_logger) - - del logs_list + if should_retry: + sleep(sleep_between_retries) + + if should_backup_to_disk and self.backup_logs: + self.stdout_logger.error( + 'Could not send logs to Logz.io after %s tries, ' + 'backing up to local file system', self.number_of_retries) + backup_logs(logs_list, self.stdout_logger) + + del logs_list def _get_messages_up_to_max_allowed_size(self): logs_list = [] current_size = 0 - while not self.queue.empty(): - current_log = self.queue.get() + while True: + try: + current_log = self.queue.get(block=False) + except queue.Empty: + break + try: current_size += sys.getsizeof(current_log) except TypeError: From 6b20222be2b8a73ce466d1cf8085c4460df1648d Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Sun, 25 Jan 2026 18:30:56 +0700 Subject: [PATCH 2/5] thread test --- tests/test_thread_safety.py | 150 ++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 tests/test_thread_safety.py diff --git a/tests/test_thread_safety.py b/tests/test_thread_safety.py new file mode 100644 index 0000000..b769578 --- /dev/null +++ b/tests/test_thread_safety.py @@ -0,0 +1,150 @@ +import threading +import time +from unittest import TestCase +from unittest.mock import patch, MagicMock + +from logzio.sender import LogzioSender + + +class TestThreadSafety(TestCase): + """Tests for thread-safety fixes in LogzioSender. + + These tests verify that: + 1. queue.get() doesn't block indefinitely when queue becomes empty + 2. Concurrent flush calls from multiple threads work correctly + 3. The flush lock prevents race conditions + """ + + @patch('logzio.sender.requests.Session') + def setUp(self, mock_session): + mock_response = MagicMock() + mock_response.status_code = 200 + mock_session.return_value.post.return_value = mock_response + + self.sender = LogzioSender( + token='test-token', + url='http://localhost:8080', + logs_drain_timeout=1, + debug=False, + backup_logs=False + ) + self.sender.sending_thread.join(timeout=0.1) + + def test_get_messages_does_not_block_on_empty_queue(self): + """Test that _get_messages_up_to_max_allowed_size returns immediately on empty queue.""" + while not self.sender.queue.empty(): + self.sender.queue.get(block=False) + + start_time = time.time() + result = self.sender._get_messages_up_to_max_allowed_size() + elapsed = time.time() - start_time + + self.assertEqual(result, []) + self.assertLess(elapsed, 0.1, "Method should return immediately, not block") + + def test_get_messages_returns_available_messages(self): + """Test that _get_messages_up_to_max_allowed_size returns queued messages.""" + self.sender.queue.put('{"message": "test1"}') + self.sender.queue.put('{"message": "test2"}') + self.sender.queue.put('{"message": "test3"}') + + result = self.sender._get_messages_up_to_max_allowed_size() + + self.assertEqual(len(result), 3) + self.assertIn('{"message": "test1"}', result) + + @patch('logzio.sender.requests.Session') + def test_concurrent_flush_calls_are_thread_safe(self, mock_session): + """Test that multiple threads can call flush() without race conditions.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_session.return_value.post.return_value = mock_response + + for i in range(100): + self.sender.queue.put(f'{{"message": "test{i}"}}') + + errors = [] + + def flush_worker(): + try: + self.sender.flush() + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=flush_worker) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + + self.assertEqual(errors, [], f"Flush raised errors: {errors}") + self.assertTrue(self.sender.queue.empty(), "Queue should be empty after flush") + + @patch('logzio.sender.requests.Session') + def test_flush_completes_without_blocking(self, mock_session): + """Test that flush() completes in reasonable time even with concurrent access.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_session.return_value.post.return_value = mock_response + + self.sender.queue.put('{"message": "test"}') + + start_time = time.time() + self.sender.flush() + elapsed = time.time() - start_time + + self.assertLess(elapsed, 5, "Flush should complete without blocking indefinitely") + + def test_flush_lock_exists(self): + """Test that the sender has a flush lock for thread synchronization.""" + self.assertTrue(hasattr(self.sender, '_flush_lock')) + self.assertIsInstance(self.sender._flush_lock, type(threading.Lock())) + + @patch('logzio.sender.requests.Session') + def test_race_condition_empty_check_then_get(self, mock_session): + """Test the specific race condition: empty() returns False but get() would block. + + This simulates the scenario where: + 1. Thread A checks queue.empty() -> False + 2. Thread B consumes the last item + 3. Thread A calls queue.get() -> would block forever without fix + """ + mock_response = MagicMock() + mock_response.status_code = 200 + mock_session.return_value.post.return_value = mock_response + + results = {'completed': False, 'error': None} + + def consumer(): + try: + time.sleep(0.05) + while not self.sender.queue.empty(): + try: + self.sender.queue.get(block=False) + except: + pass + except Exception as e: + results['error'] = e + + def flusher(): + try: + self.sender.flush() + results['completed'] = True + except Exception as e: + results['error'] = e + + self.sender.queue.put('{"message": "test"}') + + consumer_thread = threading.Thread(target=consumer) + flusher_thread = threading.Thread(target=flusher) + + consumer_thread.start() + flusher_thread.start() + + flusher_thread.join(timeout=5) + consumer_thread.join(timeout=1) + + self.assertFalse(flusher_thread.is_alive(), + "Flush should complete, not block indefinitely") + self.assertIsNone(results['error'], f"Error occurred: {results['error']}") + From 69bd7a55e688079e73a219e4db1156e59b1cfb52 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Sun, 25 Jan 2026 18:57:21 +0700 Subject: [PATCH 3/5] Update README.md --- README.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a53f129..05d243a 100644 --- a/README.md +++ b/README.md @@ -324,7 +324,16 @@ LOGGING = { ``` ## Release Notes - +- 4.1.7 + - `logzio/sender.py`: + - Added `Lock` import and `_flush_lock` instance variable + - Modified `_flush_queue()` to use lock for thread-safe flushing + - Modified `_get_messages_up_to_max_allowed_size()` to use non-blocking `queue.get(block=False)` + - `tests/test_thread_safety.py`: Added comprehensive tests for thread safety +- 4.1.6 + - Update dependencies +- 4.1.5 + - Update dependencies - 4.1.4 - Upgrade trace context opentelemetry-instrumentation-logging package version from v0.39b0 to v4.50b0 - 4.1.3 From b6706ce58a6b3d10b90730f80e674e8f4baf88d6 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Sun, 25 Jan 2026 18:57:24 +0700 Subject: [PATCH 4/5] Update setup.py --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index cb86efa..42f3d01 100644 --- a/setup.py +++ b/setup.py @@ -8,8 +8,8 @@ description="Logging handler to send logs to your Logz.io account with bulk SSL", keywords="logging handler logz.io bulk https", author="roiravhon", - maintainer="tamir-michaeli", - mail="tamir.michaeli@logz.io", + maintainer="yotam-loewenbach", + mail="yotam.loewenbach@logz.io", url="https://github.com/logzio/logzio-python-handler/", license="Apache License 2", packages=find_packages(), From 8c8add5e407b391c8d27aabc5dec8ac3ee22a830 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Mon, 26 Jan 2026 15:01:35 +0700 Subject: [PATCH 5/5] Update setup.py --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 42f3d01..e947a5d 100644 --- a/setup.py +++ b/setup.py @@ -8,8 +8,10 @@ description="Logging handler to send logs to your Logz.io account with bulk SSL", keywords="logging handler logz.io bulk https", author="roiravhon", + mail="help@logz.io", + author_email="help@logz.io", maintainer="yotam-loewenbach", - mail="yotam.loewenbach@logz.io", + maintainer_email="yotam.loewenbach@logz.io", url="https://github.com/logzio/logzio-python-handler/", license="Apache License 2", packages=find_packages(),