From 4c0b85107a81dc62c524cd5978f4ab33b69dc26b Mon Sep 17 00:00:00 2001 From: petruki <31597636+petruki@users.noreply.github.com> Date: Fri, 26 Dec 2025 14:57:54 -0800 Subject: [PATCH 1/2] feat: aded regex strategy handler and TimedMatch for safe match --- .coveragerc | 13 + .github/workflows/master.yml | 2 +- Makefile | 2 +- switcher_client/lib/snapshot.py | 18 ++ .../lib/utils/timed_match/__init__.py | 3 + .../lib/utils/timed_match/timed_match.py | 252 ++++++++++++++++++ .../lib/utils/timed_match/worker.py | 103 +++++++ tests/strategy-operations/test_regex.py | 125 +++++++++ tests/utils/__init__.py | 0 tests/utils/test_timed_match.py | 165 ++++++++++++ 10 files changed, 681 insertions(+), 2 deletions(-) create mode 100644 .coveragerc create mode 100644 switcher_client/lib/utils/timed_match/__init__.py create mode 100644 switcher_client/lib/utils/timed_match/timed_match.py create mode 100644 switcher_client/lib/utils/timed_match/worker.py create mode 100644 tests/strategy-operations/test_regex.py create mode 100644 tests/utils/__init__.py create mode 100644 tests/utils/test_timed_match.py diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..05d3abb --- /dev/null +++ b/.coveragerc @@ -0,0 +1,13 @@ +[run] +omit = + switcher_client/lib/utils/timed_match/worker.py + */tests/* + */test_* + */__init__.py + +[report] +exclude_lines = + pragma: no cover + def __repr__ + raise AssertionError + raise NotImplementedError \ No newline at end of file diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 550d64a..ddd0c71 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -28,7 +28,7 @@ jobs: pipenv install --dev - name: Test - run: pipenv run pytest -v --cov=./switcher_client --cov-report xml + run: pipenv run pytest -v --cov=./switcher_client --cov-report xml --cov-config=.coveragerc - name: SonarCloud Scan uses: sonarsource/sonarqube-scan-action@v7.0.0 diff --git a/Makefile b/Makefile index 309f669..313f1c5 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ install: pipenv install --dev test: - pytest -v --cov=./switcher_client --cov-report xml + pytest -v --cov=./switcher_client --cov-report xml --cov-config=.coveragerc cover: coverage html diff --git a/switcher_client/lib/snapshot.py b/switcher_client/lib/snapshot.py index 2a8f82b..efc1585 100644 --- a/switcher_client/lib/snapshot.py +++ b/switcher_client/lib/snapshot.py @@ -6,6 +6,7 @@ from .utils.payload_reader import parse_json, payload_reader from .utils.ipcidr import IPCIDR +from .utils.timed_match import TimedMatch class StrategiesType(Enum): VALUE = "VALUE_VALIDATION" @@ -14,6 +15,7 @@ class StrategiesType(Enum): TIME = "TIME_VALIDATION" PAYLOAD = "PAYLOAD" NETWORK = "NETWORK" + REGEX = "REGEX" class OperationsType(Enum): EXIST = "EXIST" @@ -46,6 +48,8 @@ def process_operation(strategy_config: dict, input_value: str) -> Optional[bool] return __process_payload(operation, values, input_value) case StrategiesType.NETWORK.value: return __process_network(operation, values, input_value) + case StrategiesType.REGEX.value: + return __process_regex(operation, values, input_value) def __process_value(operation: str, values: list, input_value: str) -> Optional[bool]: """ Process VALUE strategy operations.""" @@ -177,6 +181,20 @@ def __process_network_not_exist(input_value: str, values: list, cidr_regex) -> b return len(result) == 0 +def __process_regex(operation: str, values: list, input_value: str) -> Optional[bool]: + """ Process REGEX strategy operations with timeout protection.""" + + match operation: + case OperationsType.EXIST.value: + return TimedMatch.try_match(values, input_value, use_fullmatch=False) + case OperationsType.NOT_EXIST.value: + result = TimedMatch.try_match(values, input_value, use_fullmatch=False) + return not result + case OperationsType.EQUAL.value: + return TimedMatch.try_match(values, input_value, use_fullmatch=True) + case OperationsType.NOT_EQUAL.value: + result = TimedMatch.try_match(values, input_value, use_fullmatch=True) + return not result def __parse_datetime(date_str: str): """Parse datetime string that can be either date-only or datetime format.""" diff --git a/switcher_client/lib/utils/timed_match/__init__.py b/switcher_client/lib/utils/timed_match/__init__.py new file mode 100644 index 0000000..e42995b --- /dev/null +++ b/switcher_client/lib/utils/timed_match/__init__.py @@ -0,0 +1,3 @@ +from .timed_match import TimedMatch + +__all__ = ['TimedMatch'] \ No newline at end of file diff --git a/switcher_client/lib/utils/timed_match/timed_match.py b/switcher_client/lib/utils/timed_match/timed_match.py new file mode 100644 index 0000000..2a2818c --- /dev/null +++ b/switcher_client/lib/utils/timed_match/timed_match.py @@ -0,0 +1,252 @@ +import multiprocessing +import signal +import os +import time + +from typing import List, Optional, Any +from dataclasses import dataclass + +from switcher_client.lib.utils.timed_match.worker import TaskType, WorkerResult, WorkerTask, persistent_regex_worker + +# Default constants +DEFAULT_REGEX_MAX_TIME_LIMIT = 3000 # 3 seconds in milliseconds +DEFAULT_REGEX_MAX_BLACKLISTED = 100 + +@dataclass +class Blacklist: + """Represents a blacklisted regex pattern and input combination.""" + patterns: List[str] + input_value: str + +class TimedMatch: + """ + This class provides regex match operations with timeout-based ReDoS protection. + + Operations are executed in isolated processes with configurable timeouts. + Processes that exceed the timeout are terminated, preventing ReDoS attacks. + Failed operations are cached in a blacklist to avoid repeated resource usage. + """ + + _blacklisted: List[Blacklist] = [] + _max_blacklisted: int = DEFAULT_REGEX_MAX_BLACKLISTED + _max_time_limit: float = DEFAULT_REGEX_MAX_TIME_LIMIT / 1000.0 # Convert to seconds + + # Persistent worker management + _worker_process: Optional[multiprocessing.Process] = None + _task_queue: Optional[multiprocessing.Queue] = None + _result_queue: Optional[multiprocessing.Queue] = None + _worker_ctx: Optional[Any] = None + _task_counter: int = 0 + _worker_needs_restart: bool = False + _old_workers_to_cleanup: List[multiprocessing.Process] = [] + + @classmethod + def initialize_worker(cls) -> bool: + """ + Initialize the persistent worker process for regex matching. + + Creates a new worker process with communication queues. If a worker + already exists, it will be terminated before creating a new one. + + Returns: + True if worker was successfully initialized, False otherwise + """ + # Terminate existing worker if any + cls.terminate_worker() + + # Create multiprocessing context + cls._worker_ctx = multiprocessing.get_context('spawn') + + # Create communication queues + cls._task_queue = cls._worker_ctx.Queue() + cls._result_queue = cls._worker_ctx.Queue() + + # Create and start worker process + cls._worker_process = cls._worker_ctx.Process( + target=persistent_regex_worker, + args=(cls._task_queue, cls._result_queue) + ) + if cls._worker_process: + cls._worker_process.start() + + # Reset task counter + cls._task_counter = 0 + + return cls._worker_process is not None and cls._worker_process.is_alive() + + @classmethod + def terminate_worker(cls) -> None: + """ + Terminate all worker processes (current and old ones). + + Sends a shutdown signal to workers and forcefully terminates them if needed. + Cleans up all worker-related resources. + """ + try: + # Terminate current worker + if cls._worker_process and cls._worker_process.is_alive(): + cls._graceful_shutdown() + + # Terminate all old workers waiting for cleanup + cls._terminate_all_old_workers() + finally: + cls._cleanup_resources() + + @classmethod + def _graceful_shutdown(cls) -> None: + """Attempt graceful shutdown of worker process.""" + if cls._task_queue: + shutdown_task = WorkerTask( + task_type=TaskType.SHUTDOWN, + task_id=f"shutdown_{time.time()}" + ) + cls._task_queue.put(shutdown_task, timeout=1.0) + if cls._worker_process: + cls._worker_process.join(timeout=2.0) + + @classmethod + def _cleanup_resources(cls) -> None: + """Clean up all worker-related resources.""" + cls._worker_process = None + cls._task_queue = None + cls._result_queue = None + cls._worker_ctx = None + cls._task_counter = 0 + cls._worker_needs_restart = False + cls._old_workers_to_cleanup.clear() + + @classmethod + def try_match(cls, patterns: List[str], input_value: str, use_fullmatch: bool = False) -> bool: + """ + Executes regex matching operation with timeout protection. + + The operation runs in an isolated process with timeout protection to prevent + runaway regex operations that could lead to ReDoS attacks. + + Failed operations (timeouts, errors) are automatically added to a blacklist + to prevent repeated attempts with the same problematic patterns. + + Args: + patterns: Array of regular expression patterns to test against the input + input_value: The input string to match against the regex patterns + use_fullmatch: If True, uses re.fullmatch; if False, uses re.search + + Returns: + True if any of the regex patterns match the input, false otherwise + """ + if cls._is_blacklisted(patterns, input_value): + return False + + return cls._safe_match(patterns, input_value, use_fullmatch) + + @classmethod + def _safe_match(cls, patterns: List[str], input_value: str, use_fullmatch: bool) -> bool: + """ Run regex match with timeout protection using persistent worker.""" + task_id = cls._create_and_send_task(patterns, input_value, use_fullmatch) + return cls._wait_for_result(task_id, patterns, input_value) + + @classmethod + def _create_and_send_task(cls, patterns: List[str], input_value: str, use_fullmatch: bool) -> str: + """Create task and send to worker.""" + cls._task_counter += 1 + task_id = f"task_{cls._task_counter}_{time.time()}" + + task = WorkerTask( + task_type=TaskType.MATCH, + patterns=patterns, + input_value=input_value, + use_fullmatch=use_fullmatch, + task_id=task_id + ) + + if cls._task_queue: + cls._task_queue.put(task, timeout=1.0) + return task_id + + @classmethod + def _wait_for_result(cls, task_id: str, patterns: List[str], input_value: str) -> bool: + """Wait for result from worker with timeout.""" + start_time = time.time() + while time.time() - start_time < cls._max_time_limit: + try: + if cls._result_queue: + result = cls._result_queue.get(timeout=0.1) + if result.task_id == task_id: + return cls._process_worker_result(result, patterns, input_value) + except Exception: + continue + + # Timeout occurred - start new worker immediately and defer cleanup of old one + cls._replace_worker_immediately() + cls._add_to_blacklist(patterns, input_value) + return False + + @classmethod + def _process_worker_result(cls, result: WorkerResult, patterns: List[str], input_value: str) -> bool: + """Process result from worker.""" + if result.success: + return result.result if result.result is not None else False + else: + cls._add_to_blacklist(patterns, input_value) + return False + + @classmethod + def _is_blacklisted(cls, patterns: List[str], input_value: str) -> bool: + for blacklisted in cls._blacklisted: + # Check if input can contain same segment that could fail matching + if (blacklisted.input_value in input_value or input_value in blacklisted.input_value): + # Check if any of the patterns match (regex order should not affect) + matching_patterns = [p for p in patterns if p in blacklisted.patterns] + if matching_patterns: + return True + return False + + @classmethod + def _add_to_blacklist(cls, patterns: List[str], input_value: str) -> None: + # Maintain blacklist size limit + if len(cls._blacklisted) >= cls._max_blacklisted: + cls._blacklisted.pop(0) # Remove oldest entry + + cls._blacklisted.append(Blacklist( + patterns=patterns.copy(), + input_value=input_value + )) + + @classmethod + def _replace_worker_immediately(cls) -> None: + """Replace worker immediately without waiting for cleanup.""" + # Move current worker to cleanup list if it exists + if cls._worker_process: + cls._old_workers_to_cleanup.append(cls._worker_process) + + # Clear current worker references (but don't cleanup yet) + cls._worker_process = None + cls._task_queue = None + cls._result_queue = None + cls._worker_ctx = None + cls._task_counter = 0 + + # Initialize new worker immediately + cls.initialize_worker() + + @classmethod + def _terminate_all_old_workers(cls) -> None: + """Forcefully terminate all old workers synchronously.""" + for worker in cls._old_workers_to_cleanup[:]: + if worker and worker.is_alive(): + worker.terminate() + worker.join(timeout=1.0) + + cls._old_workers_to_cleanup.clear() + + @classmethod + def clear_blacklist(cls) -> None: + cls._blacklisted.clear() + + @classmethod + def set_max_blacklisted(cls, value: int) -> None: + cls._max_blacklisted = value + + @classmethod + def set_max_time_limit(cls, value: int) -> None: + cls._max_time_limit = value / 1000.0 # Convert to seconds \ No newline at end of file diff --git a/switcher_client/lib/utils/timed_match/worker.py b/switcher_client/lib/utils/timed_match/worker.py new file mode 100644 index 0000000..1e900e8 --- /dev/null +++ b/switcher_client/lib/utils/timed_match/worker.py @@ -0,0 +1,103 @@ +import re +import multiprocessing + +from enum import Enum +from typing import List, Optional +from dataclasses import dataclass + +class TaskType(Enum): + """Types of tasks that can be sent to the worker process.""" + MATCH = "match" + SHUTDOWN = "shutdown" + +@dataclass +class WorkerTask: + """Task sent to the worker process.""" + task_type: TaskType + patterns: Optional[List[str]] = None + input_value: Optional[str] = None + use_fullmatch: Optional[bool] = None + task_id: Optional[str] = None + +@dataclass +class WorkerResult: + """Result returned from the worker process.""" + success: bool + result: Optional[bool] = None + task_id: Optional[str] = None + error: Optional[str] = None + +def persistent_regex_worker(task_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue): + """ + Persistent worker function that processes regex matching tasks in a loop. + + This worker runs continuously, processing tasks from the task queue until + it receives a shutdown signal or encounters an error. + + Args: + task_queue: Queue to receive WorkerTask objects + result_queue: Queue to send WorkerResult objects back to main process + """ + try: + while True: + try: + task = task_queue.get(timeout=30.0) + + if task.task_type == TaskType.SHUTDOWN: + result_queue.put(WorkerResult(success=True, task_id=task.task_id)) + break + elif task.task_type == TaskType.MATCH: + result = _process_match_task(task) + result_queue.put(result) + + except Exception: + # Timeout or other error getting task, continue + continue + + except Exception: + # Worker process error, exit + try: + result_queue.put(WorkerResult(success=False, error="Worker process error")) + except Exception: + pass + +def _process_match_task(task: WorkerTask) -> WorkerResult: + """ + Process a regex matching task. + + Args: + task: WorkerTask containing the matching parameters + + Returns: + WorkerResult with the matching result + """ + try: + if not task.patterns or not task.input_value: + return WorkerResult( + success=False, + error="Invalid task parameters", + task_id=task.task_id + ) + + match_result = False + for pattern in task.patterns: + if task.use_fullmatch: + if re.fullmatch(pattern, task.input_value): + match_result = True + break + else: + if re.search(pattern, task.input_value): + match_result = True + break + + return WorkerResult( + success=True, + result=match_result, + task_id=task.task_id + ) + except Exception as e: + return WorkerResult( + success=False, + error=str(e), + task_id=task.task_id + ) \ No newline at end of file diff --git a/tests/strategy-operations/test_regex.py b/tests/strategy-operations/test_regex.py new file mode 100644 index 0000000..6209eb6 --- /dev/null +++ b/tests/strategy-operations/test_regex.py @@ -0,0 +1,125 @@ +import pytest +from typing import Dict, List, Any + +from switcher_client.lib.snapshot import OperationsType, StrategiesType, process_operation +from switcher_client.lib.utils.timed_match import TimedMatch + +class TestRegexStrategy: + """Test suite for Strategy [REGEX Safe] tests.""" + + @classmethod + def setup_class(cls): + """Set up TimedMatch before all tests in this class.""" + TimedMatch.set_max_time_limit(1000) # 1000ms = 1 second + TimedMatch.initialize_worker() # Initialize persistent worker + + @classmethod + def teardown_class(cls): + """Clean up TimedMatch after all tests in this class.""" + TimedMatch.terminate_worker() # Terminate any running worker processes + TimedMatch.clear_blacklist() + + @pytest.fixture + def mock_values1(self) -> List[str]: + """Single regex pattern with word boundaries.""" + return [r'\bUSER_[0-9]{1,2}\b'] + + @pytest.fixture + def mock_values2(self) -> List[str]: + """Multiple regex patterns with word boundaries.""" + return [ + r'\bUSER_[0-9]{1,2}\b', + r'\buser-[0-9]{1,2}\b' + ] + + @pytest.fixture + def mock_values3(self) -> List[str]: + """Simple regex pattern without word boundaries.""" + return ['USER_[0-9]{1,2}'] + + def given_strategy_config(self, operation: str, values: List[str]) -> Dict[str, Any]: + return { + 'strategy': StrategiesType.REGEX.value, + 'operation': operation, + 'values': values, + 'activated': True + } + + def test_should_agree_when_expect_to_exist_using_exist_operation(self, mock_values1, mock_values2): + """Should agree when expect to exist using EXIST operation.""" + + strategy_config = self.given_strategy_config(OperationsType.EXIST.value, mock_values1) + result = process_operation(strategy_config, 'USER_1') + assert result is True + + strategy_config = self.given_strategy_config(OperationsType.EXIST.value, mock_values2) + result = process_operation(strategy_config, 'user-01') + assert result is True + + def test_should_not_agree_when_expect_to_exist_using_exist_operation(self, mock_values1, mock_values3): + """Should NOT agree when expect to exist using EXIST operation.""" + + strategy_config = self.given_strategy_config(OperationsType.EXIST.value, mock_values1) + result = process_operation(strategy_config, 'USER_123') + assert result is False + + # mock_values3 does not require exact match + strategy_config = self.given_strategy_config(OperationsType.EXIST.value, mock_values3) + result = process_operation(strategy_config, 'USER_123') + assert result is True + + def test_should_agree_when_expect_to_not_exist_using_not_exist_operation(self, mock_values1, mock_values2): + """Should agree when expect to not exist using NOT_EXIST operation.""" + + strategy_config = self.given_strategy_config(OperationsType.NOT_EXIST.value, mock_values1) + result = process_operation(strategy_config, 'USER_123') + assert result is True + + strategy_config = self.given_strategy_config(OperationsType.NOT_EXIST.value, mock_values2) + result = process_operation(strategy_config, 'user-123') + assert result is True + + def test_should_not_agree_when_expect_to_not_exist_using_not_exist_operation(self, mock_values1): + """Should NOT agree when expect to not exist using NOT_EXIST operation.""" + + strategy_config = self.given_strategy_config(OperationsType.NOT_EXIST.value, mock_values1) + result = process_operation(strategy_config, 'USER_12') + assert result is False + + def test_should_agree_when_expect_to_be_equal_using_equal_operation(self, mock_values3): + """Should agree when expect to be equal using EQUAL operation.""" + + strategy_config = self.given_strategy_config(OperationsType.EQUAL.value, mock_values3) + result = process_operation(strategy_config, 'USER_11') + assert result is True + + def test_should_not_agree_when_expect_to_be_equal_using_equal_operation(self, mock_values3): + """Should NOT agree when expect to be equal using EQUAL operation.""" + + strategy_config = self.given_strategy_config(OperationsType.EQUAL.value, mock_values3) + result = process_operation(strategy_config, 'user-11') + assert result is False + + def test_should_agree_when_expect_to_not_be_equal_using_not_equal_operation(self, mock_values3): + """Should agree when expect to not be equal using NOT_EQUAL operation.""" + + strategy_config = self.given_strategy_config(OperationsType.NOT_EQUAL.value, mock_values3) + result = process_operation(strategy_config, 'USER_123') + assert result is True + + def test_should_not_agree_when_expect_to_not_be_equal_using_not_equal_operation(self, mock_values3): + """Should NOT agree when expect to not be equal using NOT_EQUAL operation.""" + + strategy_config = self.given_strategy_config(OperationsType.NOT_EQUAL.value, mock_values3) + result = process_operation(strategy_config, 'USER_1') + assert result is False + + def test_should_not_agree_when_match_cannot_finish_redos_attempt(self): + """Should NOT agree when match cannot finish (reDoS attempt).""" + + strategy_config = self.given_strategy_config( + OperationsType.EQUAL.value, + ['^(([a-z])+.)+[A-Z]([a-z])+$'] + ) + result = process_operation(strategy_config, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') + assert result is False \ No newline at end of file diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/utils/test_timed_match.py b/tests/utils/test_timed_match.py new file mode 100644 index 0000000..dc9f441 --- /dev/null +++ b/tests/utils/test_timed_match.py @@ -0,0 +1,165 @@ +import time + +from switcher_client.lib.utils.timed_match import TimedMatch + +# Test data +OK_RE = "[a-z]" +OK_INPUT = "a" +NOK_RE = "^(([a-z])+.)+[A-Z]([a-z])+$" +NOK_INPUT = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + +COLD_TIME = 500 # ms +WARM_TIME = 50 # ms +TIMEOUT = 950 # ms - 50ms margin for worker thread to finish + + +def get_timer(start_time: float) -> float: + """Calculate elapsed time in milliseconds.""" + return (time.time() - start_time) * 1000 + +class TestTimedMatch: + """Timed-Match tests.""" + + @classmethod + def setup_class(cls): + """Setup before all tests.""" + TimedMatch.initialize_worker() + + def setup_method(self): + """Setup before each test.""" + TimedMatch.clear_blacklist() + TimedMatch.set_max_blacklisted(50) + TimedMatch.set_max_time_limit(1000) + + @classmethod + def teardown_class(cls): + """Cleanup after all tests.""" + TimedMatch.terminate_worker() + # Give processes time to fully terminate + time.sleep(0.2) + + def test_should_return_true(self): + """Should return true for simple regex match.""" + + result = TimedMatch.try_match([OK_RE], OK_INPUT) + assert result is True + + def test_should_return_false_and_abort_processing(self): + + """Should return false and abort processing for ReDoS pattern.""" + result = TimedMatch.try_match([NOK_RE], NOK_INPUT) + assert result is False + + def test_runs_stress_tests(self): + """Run timing stress tests.""" + + # First run - cold start + timer = time.time() + TimedMatch.try_match([OK_RE], OK_INPUT) + elapsed = get_timer(timer) + assert elapsed < COLD_TIME + + # ReDoS pattern should timeout + timer = time.time() + TimedMatch.try_match([NOK_RE], NOK_INPUT) + elapsed = get_timer(timer) + assert elapsed > TIMEOUT + + # Another good pattern should be fast + timer = time.time() + TimedMatch.try_match([OK_RE], OK_INPUT) + elapsed = get_timer(timer) + assert elapsed < COLD_TIME + + # Multiple runs should be fast (warm cache) + for _ in range(10): + timer = time.time() + TimedMatch.try_match([OK_RE], OK_INPUT) + elapsed = get_timer(timer) + assert elapsed < WARM_TIME + + def test_should_rotate_blacklist(self): + """Should rotate blacklist when max size is reached.""" + + TimedMatch.set_max_blacklisted(1) + + # First ReDoS pattern times out + timer = time.time() + TimedMatch.try_match([NOK_RE], NOK_INPUT) + elapsed = get_timer(timer) + assert elapsed > TIMEOUT + + # Same pattern should be blacklisted (fast) + timer = time.time() + TimedMatch.try_match([NOK_RE], NOK_INPUT) + elapsed = get_timer(timer) + assert elapsed < WARM_TIME + + # New ReDoS pattern should timeout (replaces blacklist) + timer = time.time() + TimedMatch.try_match([NOK_RE], 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') + elapsed = get_timer(timer) + assert elapsed > TIMEOUT + + # New pattern should now be blacklisted (fast) + timer = time.time() + TimedMatch.try_match([NOK_RE], 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') + elapsed = get_timer(timer) + assert elapsed < WARM_TIME + + def test_should_capture_blacklisted_item_from_multiple_regex_options(self): + """Should capture blacklisted item from multiple regex options.""" + + TimedMatch.set_max_blacklisted(1) + + # First run with multiple patterns should timeout + timer = time.time() + TimedMatch.try_match([NOK_RE, OK_RE], NOK_INPUT) + elapsed = get_timer(timer) + assert elapsed > TIMEOUT + + # Blacklisted (inverted regex order should still work) + timer = time.time() + TimedMatch.try_match([OK_RE, NOK_RE], NOK_INPUT) + elapsed = get_timer(timer) + assert elapsed < WARM_TIME + + def test_should_capture_blacklisted_item_from_similar_inputs(self): + """Should capture blacklisted item from similar inputs.""" + + TimedMatch.set_max_blacklisted(1) + + # First ReDoS pattern + timer = time.time() + TimedMatch.try_match([NOK_RE, OK_RE], 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') + elapsed = get_timer(timer) + assert elapsed > TIMEOUT + + # Blacklisted (input slightly different but contains the same evil segment) + timer = time.time() + TimedMatch.try_match([NOK_RE, OK_RE], 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab') + elapsed = get_timer(timer) + assert elapsed < WARM_TIME + + # Same here + timer = time.time() + TimedMatch.try_match([NOK_RE, OK_RE], 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') + elapsed = get_timer(timer) + assert elapsed < WARM_TIME + + # And here with inverted regex + timer = time.time() + TimedMatch.try_match([OK_RE, NOK_RE], 'aaaaaaaaaaaaaaaaaaaaaaa') + elapsed = get_timer(timer) + assert elapsed < WARM_TIME + + def test_should_reduce_worker_timer(self): + """Should respect reduced worker timer setting.""" + + TimedMatch.set_max_time_limit(500) + + timer = time.time() + TimedMatch.try_match([NOK_RE], NOK_INPUT) + elapsed = get_timer(timer) + assert elapsed > 450 + assert elapsed < TIMEOUT From d6fde9ef6ad0b135af7638d0343160ad81a82c86 Mon Sep 17 00:00:00 2001 From: petruki <31597636+petruki@users.noreply.github.com> Date: Fri, 26 Dec 2025 15:01:26 -0800 Subject: [PATCH 2/2] chore: updated sonar exclusion list --- sonar-project.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sonar-project.properties b/sonar-project.properties index 8a26693..3badb1b 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -8,4 +8,4 @@ sonar.sources=switcher_client sonar.tests=tests sonar.python.coverage.reportPaths=coverage.xml sonar.python.version=3.14 -sonar.exclusions=**/tests/**, **/switcher_client/version.py \ No newline at end of file +sonar.exclusions=**/tests/**,**/switcher_client/version.py,**/switcher_client/lib/utils/timed_match/worker.py,**/switcher_client/lib/utils/timed_match/__init__.py \ No newline at end of file