diff --git a/.flake8 b/.flake8 new file mode 100755 index 0000000..6deafc2 --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length = 120 diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml new file mode 100644 index 0000000..8b7c012 --- /dev/null +++ b/.github/workflows/python-package.yml @@ -0,0 +1,45 @@ +name: Gradual + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11", "3.12"] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install build pytest pytest-cov + pip install -e . + pip install -e .[dev] + + # - name: Lint with flake8 + # run: | + # flake8 src tests + + - name: Type check with mypy + run: | + mypy src + + - name: Test with pytest + run: | + pytest --cov=src tests/ + + - name: Build package + run: | + python -m build diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e40e99b --- /dev/null +++ b/.gitignore @@ -0,0 +1,44 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual Environment +venv/ +env/ +ENV/ + +# Testing +.coverage +htmlcov/ +.pytest_cache/ + +# IDE specific files +.idea/ +.vscode/ +*.swp +*.swo + +# Documentation builds +docs/_build/ + +# Logs +server_logs/ +logs/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100755 index 0000000..f9fbd6a --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,33 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + + - repo: https://github.com/psf/black + rev: 23.10.0 + hooks: + - id: black + language_version: python3 + + - repo: https://github.com/pycqa/isort + rev: 5.12.0 + hooks: + - id: isort + + - repo: https://github.com/pycqa/flake8 + rev: 6.1.0 + hooks: + - id: flake8 + additional_dependencies: [flake8-bugbear] + + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.6.0 + hooks: + - id: mypy + additional_dependencies: + - types-requests + - types-PyYAML diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d520313 --- /dev/null +++ b/Makefile @@ -0,0 +1,61 @@ +SHELL := /bin/bash + +VENV_DIR = venv + +ifeq ($(OS),Windows_NT) + VENV_PYTHON = $(VENV_DIR)\Scripts\python.exe + VENV_PIP = $(VENV_DIR)\Scripts\pip.exe + DEL = rmdir /S /Q +else + VENV_PYTHON = $(VENV_DIR)/bin/python + VENV_PIP = $(VENV_DIR)/bin/pip + DEL = rm -rf +endif + +setup: + @if [ ! -d "$(VENV_DIR)" ]; then \ + echo "Creating new virtual environment..."; \ + python -m venv $(VENV_DIR); \ + else \ + echo "Using existing virtual environment..."; \ + fi + + $(VENV_PYTHON) -m pip install --upgrade pip setuptools wheel + $(VENV_PYTHON) -m pip install -e ".[dev,bokeh,websockets]" + +shell: +ifeq ($(OS),Windows_NT) + @cmd /k "venv\Scripts\activate" +else + @bash --rcfile <(echo "source venv/bin/activate") +endif + +develop: + pip install -e ".[dev,bokeh,websockets]" + +build: + $(VENV_PYTHON) -m build + +test: + pytest -v -s --log-cli-level=INFO + +clean: + $(DEL) build dist *.egg-info +ifeq ($(OS),Windows_NT) + @for /d %%D in (.) do if exist "%%D\__pycache__" $(DEL) "%%D\__pycache__" + @del /S /Q *.pyc 2>nul || true +else + @find . -type d -name "__pycache__" -exec $(DEL) {} + + @find . -name "*.pyc" -delete +endif + +docs: + cd docs && $(MAKE) html + +lint: + flake8 src tests + mypy src + +format: + black src tests examples + isort src tests examples diff --git a/README.md b/README.md new file mode 100644 index 0000000..2059bdf --- /dev/null +++ b/README.md @@ -0,0 +1,188 @@ +# Stress Testing Framework + +General Python Stress Testing Framework + +## Quick Installation + +```bash +# Clone the repository +git clone https://github.com/Gradual-Load-Testing/Gradual.git +cd gradual + +# Set up virtual environment and install dependencies +make setup + +# Alternatively, install directly +pip install -e ".[dev,bokeh,websockets,notebook]" + + +# Optional: Install authentication dependencies +pip install -e ".[auth]" # Install all authentication methods +pip install -e ".[kerberos]" # Install only Kerberos authentication +``` + +## Building the Package + +```bash +# Build the package +make build + +# Or manually +python -m build +``` + +## Usage Examples + +### Basic Example + +### Using Command Line Interface + +```bash +# Run a stress test using a YAML configuration +stress-run examples/api_test.yaml --users 500 --duration 60 + +# Start the monitoring dashboard +stress-dashboard --mode websocket # or --mode bokeh +``` + +## Development Guide + +### Setup Development Environment + +```bash +# Clone the repository +git clone https://github.com/Gradual-Load-Testing/Gradual.git +cd gradual + +# Install development dependencies +make setup + +# Only for the first time +pre-commit install + +# Run tests +make test + +# Format code +make format + +# Run linting +make lint +``` + +### Project Structure + +```text +gradual/ +├── benchmarks/ # Benchmark configurations +├── docs/ # Documentation +├── examples/ # Example scenarios and usage +├── notebooks/ # Jupyter notebooks for development +├── results/ # Test results output directory +├── src/ # Source code +│ └── gradual/ +├── tests/ # Test suite +├── .gitignore # Git ignore file +├── LICENSE # License file +├── Makefile # Build and development commands +├── pyproject.toml # Project configuration +├── README.md # This file +├── requirements.txt # Dependencies +└── setup.py # Setup script +``` + +### Adding Dependencies + +When adding new packages to the project, update the following files: + +1. **pyproject.toml**: Add the package to the appropriate section: + + ```toml + # For core dependencies + [project] + + dependencies = [ + # Existing dependencies... + "new-package>=1.0.0", + ] + + + # For optional dependencies + [project.optional-dependencies] + auth = [ + "requests_kerberos>=0.14.0", # For Kerberos authentication + "requests_ntlm>=1.2.0", # For NTLM authentication + "requests_oauthlib>=1.3.1", # For OAuth authentication + ] + kerberos = [ + "requests_kerberos>=0.14.0", # For Kerberos authentication only + ] + ``` + +2. **requirements.txt**: Add core dependencies with version constraints. + + + ```text + new-package>=1.0.0 + ``` + + +3. After updating these files, install the dependencies: + + + ```bash + # Activate the virtual environment if not already activated + source .venv/bin/activate # On Unix/MacOS + # OR + .venv\Scripts\activate # On Windows + + # Install core dependencies + pip install -e . + + # Install optional dependencies + pip install -e ".[auth]" # For all authentication methods + pip install -e ".[kerberos]" # For Kerberos authentication only + + ``` + +4. If the package is only needed for development, testing, or documentation: + - Add it to the appropriate section in `pyproject.toml`: + + ```toml + [project.optional-dependencies] + dev = [ + # Existing dev dependencies... + "new-dev-package>=1.0.0", + ] + ``` + + - Install it with: + + ```bash + pip install -e ".[dev]" # For dev dependencies + # OR + pip install -e ".[docs]" # For documentation dependencies + ``` + +5. Update build and CI configurations if necessary (e.g., `.github/workflows/python-package.yml`). + +6. Commit your changes to version control: + + ```bash + git add pyproject.toml requirements.txt + git commit -m "Add new-package dependency" + ``` + +## Contributing + +Contributions are welcome! Please feel free to submit a Pull Request. + +1. Fork the repository +2. Create your feature branch (`git checkout -b feature/amazing-feature`) +3. Commit your changes (`git commit -m 'Add some amazing feature'`) +4. Push to the branch (`git push origin feature/amazing-feature`) +5. Open a Pull Request + +## License + +This project is licensed under the MIT License - see the LICENSE file for details. diff --git a/benchmarks/sample_benchmarks.yaml b/benchmarks/sample_benchmarks.yaml new file mode 100755 index 0000000..e69de29 diff --git a/docs/dev_guide.md b/docs/dev_guide.md new file mode 100755 index 0000000..e69de29 diff --git a/docs/user_guide.md b/docs/user_guide.md new file mode 100755 index 0000000..e69de29 diff --git a/examples/fastapi_app/Dockerfile b/examples/fastapi_app/Dockerfile new file mode 100644 index 0000000..cc9bf97 --- /dev/null +++ b/examples/fastapi_app/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.10-slim + +WORKDIR /app + +COPY app ./app +COPY requirements.txt . + +RUN pip install --upgrade pip && \ + pip install -r requirements.txt diff --git a/examples/fastapi_app/README.md b/examples/fastapi_app/README.md new file mode 100644 index 0000000..2df6c8c --- /dev/null +++ b/examples/fastapi_app/README.md @@ -0,0 +1,89 @@ +# 🚀 FastAPI Server - Development Guide with Docker + +This FastAPI server is used for stress testing. It supports **hot reload**, logs all incoming requests to disk, and runs inside a Docker container with logs accessible on the host. + +--- + +## 📦 Requirements + +- [Docker](https://www.docker.com/) +- [Docker Compose](https://docs.docker.com/compose/) + +--- + +## 📁 Project Structure + +``` +. +├── app/ +│ ├── main.py # FastAPI app entrypoint +│ └── logger.py # Logger that writes to /app/logs +├── Dockerfile +├── docker-compose.yml +├── requirements.txt +└── server_logs/ # Logs appear here (auto-created if missing) +``` + +--- + +## 🐳 Run the FastAPI Server + +```bash +docker-compose up --build +``` + +This will: +- Build the Docker image. +- Start the server on `http://localhost:8000`. +- Enable **auto-reloading** on code changes inside the `app/` directory. +- Store logs in `./server_logs/api_requests.log`. + +--- + +## 🔁 Auto-Reload for Development + +The server runs with `uvicorn --reload`, so any changes inside the `app/` directory will automatically restart the server. + +> Note: If you rename or add new Python modules, the server may need a manual restart. + +--- + +## 📄 Logging + +All HTTP requests are logged to: +``` +./server_logs/api_requests.log +``` + +This is helpful for verifying server behavior during integration or stress testing. + +--- + +## 🧪 Test Endpoints + +You can test the server with: + +```bash +curl http://localhost:8000/ping +curl http://localhost:8000/data +curl -X POST http://localhost:8000/submit -H "Content-Type: application/json" -d '{"key": "value"}' +``` + +Logs for each request will be appended to the log file. + +--- + +## 🧹 Clean Up + +To stop and remove the container: + +```bash +docker-compose down +``` + +To rebuild everything from scratch: + +```bash +docker-compose down --volumes --rmi all +docker-compose up --build +``` \ No newline at end of file diff --git a/examples/fastapi_app/app/__init__.py b/examples/fastapi_app/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/fastapi_app/app/logger.py b/examples/fastapi_app/app/logger.py new file mode 100644 index 0000000..8f79210 --- /dev/null +++ b/examples/fastapi_app/app/logger.py @@ -0,0 +1,26 @@ +import logging +from logging.handlers import RotatingFileHandler +from pathlib import Path + + +def setup_logger(): + log_dir = Path("logs") + Path.mkdir(log_dir, exist_ok=True, parents=True) + + logger = logging.getLogger("api_logger") + logger.setLevel(logging.INFO) + + log_file = log_dir / "api_requests.log" + + # Rotate every 5 MB, keep last 5 log files + handler = RotatingFileHandler( + log_file, maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8" + ) + + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + + if not logger.handlers: + logger.addHandler(handler) + + return logger diff --git a/examples/fastapi_app/app/main.py b/examples/fastapi_app/app/main.py new file mode 100644 index 0000000..0bef6ca --- /dev/null +++ b/examples/fastapi_app/app/main.py @@ -0,0 +1,27 @@ +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse +from .logger import setup_logger + +app = FastAPI() +logger = setup_logger() + + +@app.get("/ping") +async def ping(): + logger.info("Received ping.") + return {"message": "pong"} + + +@app.get("/data") +async def get_data(): + logger.info("Serving sample data") + return {"data": "Sample payload for stress testing"} + + +@app.post("/submit") +async def submit_data(request: Request): + payload = await request.json() + logger.info(f"Data received: {payload}") + return JSONResponse( + status_code=201, content={"message": "Data received", "data": payload} + ) diff --git a/examples/fastapi_app/docker-compose.yml b/examples/fastapi_app/docker-compose.yml new file mode 100644 index 0000000..cf943ab --- /dev/null +++ b/examples/fastapi_app/docker-compose.yml @@ -0,0 +1,17 @@ +version: '3.8' + +services: + fastapi-server: + build: + context: . + dockerfile: Dockerfile + ports: + - "8000:8000" + volumes: + - ./app:/app/app # Live reload code edits (dev mode) + - ./server_logs:/app/logs # Persist and access logs + command: > + uvicorn app.main:app + --host 0.0.0.0 + --port 8000 + --reload diff --git a/examples/fastapi_app/requirements.txt b/examples/fastapi_app/requirements.txt new file mode 100644 index 0000000..f0615cf --- /dev/null +++ b/examples/fastapi_app/requirements.txt @@ -0,0 +1,2 @@ +fastapi +uvicorn \ No newline at end of file diff --git a/examples/fastapi_app/stress_test_configs/request_config.yaml b/examples/fastapi_app/stress_test_configs/request_config.yaml new file mode 100644 index 0000000..318a4c7 --- /dev/null +++ b/examples/fastapi_app/stress_test_configs/request_config.yaml @@ -0,0 +1,22 @@ +requests: + "request1": + url: "http://localhost:8000/ping" + method: "GET" + expected_response_time: 1 + auth: null + "request2": + url: "http://localhost:8000/data" + method: "GET" + expected_response_time: 1 + auth: null + "request3": + url: "http://localhost:8000/submit" + method: "POST" + expected_response_time: 1 + auth: null + params: + name: "John Doe" + email: "john.doe@example.com" + age: 30 + city: "New York" + country: "USA" diff --git a/examples/fastapi_app/stress_test_configs/test_config.yaml b/examples/fastapi_app/stress_test_configs/test_config.yaml new file mode 100755 index 0000000..4bfc82a --- /dev/null +++ b/examples/fastapi_app/stress_test_configs/test_config.yaml @@ -0,0 +1,30 @@ +runs: + name: "Test Run" + wait_between_phases: 10 + phases: + "phase1": + scenarios: + "scenario1": + requests: + - "request1" + - "request2" + min_concurrency: 1 + max_concurrency: 10 + ramp_up_multiply: + - 1 + - 2 + - 3 + ramp_up_wait: + - 1 + - 2 + - 3 + iterate_through_requests: true + "scenario2": + requests: + - "request3" + min_concurrency: 1 + max_concurrency: 10 + ramp_up_multiply: 1 + ramp_up_wait: 1 + iterate_through_requests: true + run_time: 10 diff --git a/examples/postgres/Dockerfile b/examples/postgres/Dockerfile new file mode 100644 index 0000000..ee5162e --- /dev/null +++ b/examples/postgres/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.10-slim + +WORKDIR /workdir + +COPY requirements.txt . + +RUN pip install --upgrade pip && \ + pip install -r requirements.txt + +CMD ["python", "/app/connector.py"] diff --git a/examples/postgres/docker-compose.yml b/examples/postgres/docker-compose.yml new file mode 100644 index 0000000..d30e73b --- /dev/null +++ b/examples/postgres/docker-compose.yml @@ -0,0 +1,24 @@ + +version: '3.8' + +services: + postgres: + image: postgres + container_name: stress_postgres + restart: unless-stopped + environment: + POSTGRES_USER: stress_user + POSTGRES_PASSWORD: stress_password + POSTGRES_DB: stress_db + ports: + - "5432:5432" + volumes: + - ./data:/var/lib/postgresql/data + connector: + build: + context: . + dockerfile: Dockerfile + container_name: stress_postgres_connector + volumes: + - ./workdir:/app + working_dir: /app diff --git a/examples/postgres/requirements.txt b/examples/postgres/requirements.txt new file mode 100644 index 0000000..37ec460 --- /dev/null +++ b/examples/postgres/requirements.txt @@ -0,0 +1 @@ +psycopg2-binary diff --git a/examples/postgres/workdir/__init__.py b/examples/postgres/workdir/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/postgres/workdir/connector.py b/examples/postgres/workdir/connector.py new file mode 100644 index 0000000..dde4c87 --- /dev/null +++ b/examples/postgres/workdir/connector.py @@ -0,0 +1,12 @@ +import psycopg2 + + +conn = psycopg2.connect( + host="postgres", + port=5432, + database="stress_db", + user="stress_user", + password="stress_password", +) + +print(conn) diff --git a/notebooks/getting_started.ipynb b/notebooks/getting_started.ipynb new file mode 100755 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d17e825 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,131 @@ +[build-system] +requires = ["setuptools>=42", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "gradual" +version = "0.1.0" +description = "A stress testing framework for applications and systems" +readme = "README.md" +authors = [ + { name = "Subham Agrawal", email = "subhamagrawal7@gmail.com" }, + { name = "Aditya Khandelwal", email = "adityakedawat@gmail.com" }, +] +license = "MIT" +license-files = ["LICENSE"] +classifiers = [ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", + "Topic :: Software Development :: Testing", + "Topic :: Software Development :: Testing :: Traffic Generation", + "Topic :: System :: Benchmark", +] +requires-python = ">=3.9" +dependencies = [ + # Core dependencies + "gevent>=22.10.2", # For concurrency + "websocket-client>=1.6.0", # For WebSocket support + "tabulate>=0.9.0", # Tabulate data in markdown table + "requests>=2.31.0", # For HTTP requests +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.3.0", + "black>=23.3.0", + "isort>=5.12.0", + "flake8>=6.0.0", + "mypy>=1.2.0", + "pre-commit>=4.2.0", + "types-requests>=2.0.0", + "types-gevent>=24.0.0", + "types-PyYAML>=6.0.0", + "build>=0.10.0", # For package building +] +dashboard = [ + "fastapi>=0.95.0", # For API components + "uvicorn>=0.21.0", # ASGI server + "pydantic>=1.10.0", # Data validation for FastAPI +] +visualization = [ + "bokeh>=3.1.0", # For Bokeh visualization +] +metrics = [ + "prometheus-client>=0.16.0", # For Prometheus metrics + "statsd>=4.0.0", # For StatsD metrics +] +distributed = [ + "redis>=4.5.0", # For distributed testing coordination + "celery>=5.2.0", # For distributed task execution +] +auth = [ + "requests_kerberos>=0.14.0", # For Kerberos authentication + "requests_ntlm>=1.2.0", # For NTLM authentication + "requests_oauthlib>=1.3.1", # For OAuth authentication +] +kerberos = [ + "requests_kerberos>=0.14.0", # For Kerberos authentication only +] +integration = [ + "fastapi>=0.95.0", # For API components + "uvicorn>=0.21.0", # ASGI server + "pydantic>=1.10.0", # Data validation for FastAPI +] + +[project.scripts] +stress-dashboard = "gradual.dashboard:main" +stress-run = "gradual.cli:main" + +[project.urls] +"Homepage" = "https://github.com/Gradual-Load-Testing/gradual" +"Bug Tracker" = "https://github.com/Gradual-Load-Testing/gradual/issues" + +[tool.setuptools] +package-dir = { "" = "src" } +packages = ["gradual"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +log_cli = true +log_cli_level = "INFO" +log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)" +log_cli_date_format = "%Y-%m-%d %H:%M:%S" + +[tool.black] +line-length = 88 +target-version = ['py310', 'py311', 'py312'] +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist +)/ +''' + +[tool.isort] +profile = "black" +line_length = 88 +multi_line_output = 3 + +[tool.mypy] +python_version = "3.10" +disallow_untyped_defs = false +disallow_incomplete_defs = true +check_untyped_defs = false +disallow_untyped_decorators = true +no_implicit_optional = true +strict_optional = true +warn_redundant_casts = true +warn_return_any = true +warn_unused_ignores = true +disable_error_code = ["no-untyped-def", "import-untyped", "call-arg"] +[[tool.mypy.overrides]] +module = ["requests_kerberos"] +ignore_missing_imports = true diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e79ebe2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,27 @@ +# Core dependencies +gevent>=22.10.2 +websocket-client>=1.6.0 +tabulate>=0.9.0 +requests>=2.31.0 + +# Development dependencies +pytest>=7.3.0 +black>=23.3.0 +isort>=5.12.0 +flake8>=6.0.0 +mypy>=1.2.0 +build>=0.10.0 +pre-commit>=4.2.0 +types-requests>=2.0.0 +types-gevent>=24.0.0 +types-PyYAML>=6.0.0 + +# Optional dependencies - install as needed: +# pip install fastapi>=0.95.0 uvicorn>=0.21.0 pydantic>=1.10.0 # For dashboard +# pip install bokeh>=3.1.0 # For Bokeh visualization +# pip install prometheus-client>=0.16.0 # For Prometheus metrics +# pip install statsd>=4.0.0 # For StatsD metrics +# pip install redis>=4.5.0 celery>=5.2.0 # For distributed testing +# pip install requests_kerberos>=0.14.0 # For Kerberos authentication +# pip install requests_ntlm>=1.2.0 # For NTLM authentication +# pip install requests_oauthlib>=1.3.1 # For OAuth authentication diff --git a/results/sample_result.yaml b/results/sample_result.yaml new file mode 100755 index 0000000..e69de29 diff --git a/scripts/run_stress_test.py b/scripts/run_stress_test.py new file mode 100644 index 0000000..31d8184 --- /dev/null +++ b/scripts/run_stress_test.py @@ -0,0 +1,25 @@ +from gevent import monkey + +monkey.patch_all() +import pathlib +from argparse import ArgumentParser +from gradual.base.orchestrator import Orchestrator +from logging import getLogger, INFO + +logger = getLogger() +logger.setLevel(INFO) + +parser = ArgumentParser() +parser.add_argument( + "--test_config", + type=pathlib.Path, + required=True, + help="Path to the test configuration file", +) +parser.add_argument( + "--request_config", type=pathlib.Path, help="Path to the request configuration file" +) +args = parser.parse_args() + +orchestrator = Orchestrator(args.test_config, args.request_config) +orchestrator.start_stress_test() diff --git a/src/gradual/__init__.py b/src/gradual/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/src/gradual/base/__init__.py b/src/gradual/base/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/src/gradual/base/orchestrator.py b/src/gradual/base/orchestrator.py new file mode 100644 index 0000000..f756588 --- /dev/null +++ b/src/gradual/base/orchestrator.py @@ -0,0 +1,63 @@ +""" +The orchestrator module provides the main coordination logic for stress testing. +It manages the execution of test phases and handles the overall test flow. +""" + +from logging import info + +import gevent + +from gradual.configs.parser import Parser +from gradual.runners.phase import Phase + + +class Orchestrator: + """ + Main orchestrator class that coordinates the execution of stress tests. + + The Orchestrator is responsible for: + 1. Loading and parsing test configurations + 2. Managing the execution of test phases + 3. Handling timing between test phases + 4. Coordinating the overall test flow + + Attributes: + test_config_file_path (str): Path to the main test configuration file + request_configs_path (str): Path to the request configurations file + parser (Parser): Instance of the configuration parser + """ + + def __init__(self, test_config_file_path: str, request_configs_path: str): + """ + Initialize the Orchestrator with configuration file paths. + + Args: + test_config_file_path (str): Path to the main test configuration file + request_configs_path (str): Path to the request configurations file + """ + self.test_config_file_path = test_config_file_path + self.request_configs_path = request_configs_path + self.parser = Parser(self.test_config_file_path, self.request_configs_path) + self.parser.read_configs() + + def start_stress_test(self): + """ + Start the stress test execution. + + This method: + 1. Iterates through each phase in the test configuration + 2. Creates and executes a Phase instance for each configuration + 3. Waits for the specified time between phases + 4. Uses gevent for concurrent execution of phases + """ + info("Starting stress test.") + for idx, phase_config in enumerate(self.parser.phases): + phase = Phase(phase_config, self.parser.run_name) + running_phase = gevent.spawn(phase.execute) + gevent.wait([running_phase]) + + if idx < len(self.parser.phases) - 1: + info( + f"waiting for {self.parser.phase_wait} secs before starting new phase." + ) + gevent.sleep(self.parser.phase_wait) diff --git a/src/gradual/configs/__init__.py b/src/gradual/configs/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/src/gradual/configs/parser.py b/src/gradual/configs/parser.py new file mode 100644 index 0000000..0a3242e --- /dev/null +++ b/src/gradual/configs/parser.py @@ -0,0 +1,217 @@ +""" +The parser module provides the Parser class which handles loading and parsing +configuration files for stress testing. It supports YAML configuration files +and validates the configuration structure. +""" + +from dataclasses import dataclass, field +from logging import info +from pathlib import Path + +import yaml + +from gradual.configs.request import RequestConfig +from gradual.configs.scenario import ScenarioConfig +from gradual.configs.phase import PhaseConfig +from gradual.configs.validate import assert_not_empty, validate_min_concurrency + + +def convert_list(val): + """ + Convert a single value to a list if it's not already a list. + + This utility function ensures that values that should be lists are always + in list format, converting single values to single-item lists. + + Args: + val: Value to convert (int or list) + + Returns: + list: The input value as a list + Raises: + TypeError: If the value is not an int or list + """ + if isinstance(val, int): + return [val] + if isinstance(val, list): + return val + raise TypeError(f"Expected int or list, got {type(val).__name__}: {val}") + + +@dataclass +class Parser: + """ + Configuration parser for stress testing setup. + + This class handles loading and parsing of configuration files, including: + 1. Test configuration files + 2. Request configuration files + 3. Parameter configuration files + + It validates the configuration structure and creates appropriate configuration + objects for phases, scenarios, and requests. + + Attributes: + test_config_file_path (str): Path to the main test configuration file + request_configs_path (str): Path to the request configurations file + run_name (str | None): Name of the test run + phases (list[PhaseConfig]): List of parsed phase configurations + phase_wait (int): Wait time between phases in seconds + """ + + test_config_file_path: str + request_configs_path: str + run_name: str | None = None + phases: list[PhaseConfig] = field(default_factory=list) + phase_wait: int = 0 + + @staticmethod + def read_request_file(file_path: Path): + """ + Read and parse a request configuration file. + + This method reads a YAML file containing request configurations and creates + RequestConfig objects for each request definition. It validates required fields + and handles optional parameters. + + Args: + file_path (Path): Path to the request configuration file + + Returns: + list[RequestConfig]: List of parsed request configurations + + Raises: + AssertionError: If required fields are missing + """ + request_config = [] + with file_path.open("r") as request_file: + requests = yaml.safe_load(request_file) + assert_not_empty("requests", requests.get("requests")) + for request_name, request in requests["requests"].items(): + assert_not_empty( + "params", + request.get("params"), + f"Please provide params for request: {request}.", + ) + assert_not_empty( + "method", + request.get("method"), + f"Please provide method for request: {request}.", + ) + assert_not_empty( + "expected_response_time", + request.get("expected_response_time"), + f"Please provide expected_response_time for request: {request}.", + ) + config = RequestConfig( + name=request_name, + url=request.get("url", ""), + params=request.get("params", {}), + http_method=request.get("method", "get"), + expected_response_time=request.get("expected_response_time", 0), + auth=request.get("auth", None), + ) + request_config.append(config) + return request_config + + def read_configs(self): + """ + Read and parse all configuration files. + + This method: + 1. Reads the main test configuration file + 2. Reads the request configurations file if specified + 3. Validates required fields and structure + 4. Creates phase, scenario, and request configurations + 5. Handles ramp-up and timing configurations + + Raises: + AssertionError: If required fields are missing or invalid + """ + info("Reading configs...") + + with open(self.test_config_file_path, "r") as scenario_file: + scenarios_config = yaml.safe_load(scenario_file) + + if self.request_configs_path: + with open(self.request_configs_path, "r") as param_file: + params_config = yaml.safe_load(param_file) + + else: + params_config = {} + + self.phases = [] + + assert_not_empty("run_name", scenarios_config["runs"]["name"]) + self.run_name = scenarios_config["runs"]["name"] + self.phase_wait = scenarios_config["runs"].get("wait_between_phases", 0) + + assert_not_empty("phases", scenarios_config["runs"].get("phases")) + + for phase_name, phase_data in scenarios_config["runs"]["phases"].items(): + scenarios = [] + + assert_not_empty( + f"scenarios for phase: {phase_name}", + phase_data.get("scenarios"), + ) + + for scenario_name, scenario_data in phase_data["scenarios"].items(): + request_configs = [] + + if scenario_data["requests"] == "FROM_REQUEST_YAML_FILE": + request_configs = self.read_request_file( + scenario_data["request_file"] + ) + else: + for scenario_request_name in scenario_data["requests"]: + request = params_config["requests"][scenario_request_name] + request_configs.append( + RequestConfig( + name=scenario_request_name, + url=request.get("url", ""), + params=request.get("params", {}), + http_method=request.get("method", "get"), + expected_response_time=request[ + "expected_response_time" + ], + auth=request.get("auth", None), + ) + ) + ramp_up = [] + ramp_up_wait = [] + ramp_up_multiply = scenario_data.get("ramp_up_multiply", None) + if ramp_up_multiply: + ramp_up = convert_list(ramp_up_multiply) + multiply = True + else: + ramp_up = convert_list(scenario_data.get("ramp_up_add", 0)) + multiply = False + + ramp_up_wait = convert_list(scenario_data.get("ramp_up_wait", [0.1])) + run_once = scenario_data.get("run_once", False) + iterate_through_requests = scenario_data.get( + "iterate_through_requests", False + ) + scenarios.append( + ScenarioConfig( + name=scenario_name, + min_concurrency=validate_min_concurrency( + scenario_data["min_concurrency"], multiply + ), + max_concurrency=scenario_data["max_concurrency"], + ramp_up=ramp_up, + ramp_up_wait=ramp_up_wait, + request_configs=request_configs, + multiply=multiply, + run_once=run_once, + iterate_through_requests=iterate_through_requests, + ) + ) + self.phases.append( + PhaseConfig( + name=phase_name, + scenario_config=scenarios, + runtime=phase_data["run_time"], + ) + ) diff --git a/src/gradual/configs/phase.py b/src/gradual/configs/phase.py new file mode 100644 index 0000000..5cd8524 --- /dev/null +++ b/src/gradual/configs/phase.py @@ -0,0 +1,58 @@ +""" +The phase module provides the PhaseConfig class which defines the configuration +for a test phase in stress testing. A phase represents a self-contained unit +of testing with its own runtime and scenario configurations. +""" + +from dataclasses import asdict, dataclass + +from gradual.configs.scenario import ScenarioConfig + + +@dataclass +class PhaseConfig: + """ + Configuration class for a test phase in stress testing. + + This class defines a self-contained unit of testing that includes: + 1. A set of scenario configurations to execute + 2. A runtime limit for the entire phase + 3. Methods for serialization and reporting + + Attributes: + name (str): Unique identifier for this phase + scenario_config (list[ScenarioConfig]): List of scenario configurations to execute + runtime (int): Maximum runtime for this phase in seconds + """ + + name: str + scenario_config: list[ScenarioConfig] + runtime: int + + @property + def phase_runtime(self): + """ + Get the runtime limit for this phase. + + Returns: + int: Maximum runtime for this phase in seconds + """ + return self.runtime + + def as_simple_obj(self): + """ + Convert the phase configuration to a simplified dictionary format. + + This method transforms the configuration into a more compact representation + suitable for reporting or serialization. It: + 1. Converts the configuration to a dictionary + 2. Converts each scenario configuration to its simplified form + + Returns: + dict: Simplified configuration object with all nested configurations + """ + obj_dict = asdict(self) + obj_dict["scenario_config"] = [ + scenario.as_simple_obj() for scenario in self.scenario_config + ] + return obj_dict diff --git a/src/gradual/configs/request.py b/src/gradual/configs/request.py new file mode 100644 index 0000000..0cc8d7e --- /dev/null +++ b/src/gradual/configs/request.py @@ -0,0 +1,75 @@ +""" +The request module provides configuration classes and utilities for managing API request +configurations in the stress testing framework. It includes support for different +request types (HTTP, WebSocket) and their specific parameters. +""" + +from dataclasses import dataclass +from typing import Any, Optional + +from gradual.constants.request_types import RequestType + + +def check_websocket_or_http(url): + """ + Determine if a URL is for HTTP or WebSocket based on its protocol. + + This function analyzes the URL protocol to determine the appropriate request type. + It supports both HTTP and WebSocket protocols. + + Args: + url (str): The URL to check + + Returns: + RequestType: The determined request type (http or websocket), or None if URL is empty + + Note: + The function checks the protocol part of the URL (before the first ':') + against known HTTP and WebSocket protocols. + """ + if not url: + return None + url_type = url.split(":")[0] + if url_type in RequestType.http.value: + return RequestType.http + if url_type in RequestType.websocket.value: + return RequestType.websocket + + +@dataclass +class RequestConfig: + """ + Configuration class for API requests in stress testing. + + This class defines the structure and parameters for individual API requests, + supporting both HTTP and WebSocket protocols. It includes validation and + automatic type detection based on the URL. + + Attributes: + name (str): Unique identifier for this request configuration + params (Optional[dict[str, Any]]): Parameters to be sent with the request + http_method (str): HTTP method to use (for HTTP requests) + expected_response_time (float): Expected response time in seconds + context (Optional[dict[str, Any]]): Additional context for the request + url (str): Target URL for the request + auth (Optional[str]): Authentication method to use + type (Optional[RequestType]): Type of request (HTTP or WebSocket) + """ + + name: str + params: Optional[dict[str, Any]] + http_method: str + expected_response_time: float + context: Optional[dict[str, Any]] = None + url: str = "" + auth: Optional[str] = None + type: Optional[RequestType] = RequestType.http + + def __post_init__(self): + """ + Post-initialization hook to set the request type based on the URL. + + This method is automatically called after initialization to determine + the appropriate request type based on the URL protocol. + """ + self.type = check_websocket_or_http(self.url) diff --git a/src/gradual/configs/scenario.py b/src/gradual/configs/scenario.py new file mode 100644 index 0000000..de572fb --- /dev/null +++ b/src/gradual/configs/scenario.py @@ -0,0 +1,65 @@ +""" +The scenario module provides the ScenarioConfig class which defines the configuration +for a group of related API requests in stress testing. It manages concurrency, ramp-up +behavior, and request iteration settings. +""" + +from dataclasses import asdict, dataclass + +from gradual.configs.request import RequestConfig + + +@dataclass +class ScenarioConfig: + """ + Configuration class for a scenario of API requests in stress testing. + + This class defines how a group of related API requests should be executed, + including concurrency settings, ramp-up behavior, and request iteration patterns. + + Attributes: + name (str): Unique identifier for this scenario + min_concurrency (int): Minimum number of concurrent requests + max_concurrency (int): Maximum number of concurrent requests + ramp_up (list[int]): List of values for gradual increase in concurrency + ramp_up_wait (list[int]): List of wait times between ramp-up steps + request_configs (list[RequestConfig]): List of request configurations in this scenario + multiply (bool): Whether to multiply or add during ramp-up + run_once (bool): Whether requests should run only once + iterate_through_requests (bool): Whether to cycle through all requests + """ + + name: str + min_concurrency: int + max_concurrency: int + ramp_up: list[int] + ramp_up_wait: list[int] + request_configs: list[RequestConfig] + multiply: bool + run_once: bool + iterate_through_requests: bool + + def as_simple_obj(self): + """ + Convert the scenario configuration to a simplified dictionary format. + + This method transforms the configuration into a more compact representation + suitable for reporting or serialization. It: + 1. Converts the configuration to a dictionary + 2. Replaces request_configs with a count + 3. Renames ramp_up based on multiply setting + 4. Restructures the output with scenario name as key + + Returns: + dict: Simplified configuration object with scenario name as key + """ + obj_dict = asdict(self) + obj_dict["no_of_requests"] = len(obj_dict.pop("request_configs")) + if self.multiply: + obj_dict["ramp_up_multiply"] = obj_dict.pop("ramp_up", None) + else: + obj_dict["ramp_up_add"] = obj_dict.pop("ramp_up", None) + + obj_dict.pop("name") + obj_dict = {self.name: obj_dict} + return obj_dict diff --git a/src/gradual/configs/validate.py b/src/gradual/configs/validate.py new file mode 100644 index 0000000..10716a5 --- /dev/null +++ b/src/gradual/configs/validate.py @@ -0,0 +1,55 @@ +""" +The validate module provides utility functions for validating stress testing +configurations. It includes functions for checking minimum concurrency values +and ensuring required properties are not empty. +""" + +from logging import warning + +from gradual.exceptions import InvalidConfigError + + +def validate_min_concurrency(min_concurrency, multiple): + """ + Validate and adjust minimum concurrency value based on ramp-up mode. + + This function ensures that minimum concurrency is valid when using + multiplication-based ramp-up. It prevents zero concurrency scenarios + that would result in no test execution. + + Args: + min_concurrency (int): The minimum concurrency value to validate + multiple (bool): Whether multiplication-based ramp-up is being used + + Returns: + int: The validated minimum concurrency value + + Note: + If multiple is True and min_concurrency is 0, the function returns 1 + to prevent zero concurrency scenarios. + """ + if multiple and min_concurrency == 0: + warning( + "You have passed ramp up multiplier with minimum concurrency as 0. This will result in 0 concurrency, making minimum concurrency as 1 " + ) + return 1 + return min_concurrency + + +def assert_not_empty(prop, value, error_msg=None): + """ + Assert that a configuration property has a non-empty value. + + This function validates that required configuration properties are provided + and not empty. It raises an InvalidConfigError if the validation fails. + + Args: + prop (str): Name of the property being validated + value: Value to check for emptiness + error_msg (str, optional): Custom error message to use + + Raises: + InvalidConfigError: If the value is empty or None + """ + if not value: + InvalidConfigError(prop, error_msg) diff --git a/src/gradual/constants/__init__.py b/src/gradual/constants/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/gradual/constants/request_types.py b/src/gradual/constants/request_types.py new file mode 100644 index 0000000..fcc2cbd --- /dev/null +++ b/src/gradual/constants/request_types.py @@ -0,0 +1,24 @@ +""" +The request_types module provides the RequestType enum which defines the supported +types of API requests in the stress testing framework. It includes HTTP and WebSocket +protocols with their respective URL schemes. +""" + +from enum import Enum + + +class RequestType(Enum): + """ + Enumeration of supported API request types and their URL schemes. + + This enum defines the different types of API requests that can be made + in the stress testing framework, along with their corresponding URL + protocol schemes. + + Attributes: + websocket (list[str]): List of WebSocket URL schemes (wss, ws) + http (list[str]): List of HTTP URL schemes (http, https) + """ + + websocket = ["wss", "ws"] + http = ["http", "https"] diff --git a/src/gradual/exceptions.py b/src/gradual/exceptions.py new file mode 100644 index 0000000..8b9463e --- /dev/null +++ b/src/gradual/exceptions.py @@ -0,0 +1,34 @@ +""" +This module contains custom exceptions used throughout the stress testing framework. +These exceptions help provide clear error messages and handle specific error cases +that may occur during test configuration and execution. +""" + +class InvalidConfigError(ValueError): + """ + Exception raised when user passes invalid or missing configuration properties. + + This exception is used to indicate configuration errors in the stress testing framework, + such as missing required parameters or invalid values in the configuration files. + + Attributes: + prop (str, optional): The property for which the configuration is invalid or empty. + msg (str, optional): Detailed explanation of the error. If not provided, a default + message will be generated based on the property name. + """ + + def __init__(self, prop=None, msg=None): + """ + Initialize the InvalidConfigError with property and message details. + + Args: + prop (str, optional): The property that caused the error + msg (str, optional): Custom error message. If not provided, a default message + will be generated using the property name. + """ + if not msg and prop is None: + self.message = f"Please provide a value for {prop}" + else: + self.message = msg + + super().__init__(self.message) diff --git a/src/gradual/reporting/__init__.py b/src/gradual/reporting/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/gradual/reporting/adapters/__init__.py b/src/gradual/reporting/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/gradual/reporting/adapters/base.py b/src/gradual/reporting/adapters/base.py new file mode 100644 index 0000000..ebe124f --- /dev/null +++ b/src/gradual/reporting/adapters/base.py @@ -0,0 +1,6 @@ +class Adapter: + def __init__(self, *args, **kwargs): + self.stats = kwargs.get("stats") + + def process_stats(self, stat_data: dict): + raise NotImplementedError("process_stats method must be implemented") diff --git a/src/gradual/reporting/adapters/logging.py b/src/gradual/reporting/adapters/logging.py new file mode 100644 index 0000000..73e012f --- /dev/null +++ b/src/gradual/reporting/adapters/logging.py @@ -0,0 +1,26 @@ +import logging +from gradual.reporting.adapters.base import Adapter +from gradual.reporting.logger import size_based_logger + + +class LoggingAdapter(Adapter): + """ + Adapter that logs the stats to a file. + + Args: + logger: The logger to use to log the stats. + *args: Additional arguments to pass to the Adapter class. + **kwargs: Additional keyword arguments to pass to the Adapter class. + """ + + def __init__( + self, + logger: logging.Logger = size_based_logger("stress_test"), + *args, + **kwargs, + ): + self.Logger = logger + super().__init__(*args, **kwargs) + + def process_stats(self, stat_data: dict): + self.Logger.info(list(stat_data.items())) diff --git a/src/gradual/reporting/logger.py b/src/gradual/reporting/logger.py new file mode 100644 index 0000000..b12f98c --- /dev/null +++ b/src/gradual/reporting/logger.py @@ -0,0 +1,43 @@ +import logging +from logging.handlers import RotatingFileHandler +from pathlib import Path + +BACKUP_COUNT = 15 +FILE_SIZE = 5 * 1024 * 1024 + + +def size_based_logger( + name: str, + file_size: int = FILE_SIZE, + backup_count: int = BACKUP_COUNT, + log_dir: str = "logs/stress_test", +): + """ + Create a logger that logs to a file. + + Args: + name: The name of the logger. + file_size: The size of the log file in bytes. + backup_count: The number of backup log files to keep. + log_dir: The directory to log to. + """ + log_dir_path = Path(log_dir) + Path.mkdir(log_dir_path, exist_ok=True, parents=True) + + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + + log_file = log_dir_path / f"{name}.log" + + # Rotate every 5 MB, keep last 5 log files + handler = RotatingFileHandler( + log_file, maxBytes=file_size, backupCount=backup_count, encoding="utf-8" + ) + + formatter = logging.Formatter( + "[%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d] %(message)s" + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.propagate = False + return logger diff --git a/src/gradual/reporting/stats.py b/src/gradual/reporting/stats.py new file mode 100644 index 0000000..c61efb5 --- /dev/null +++ b/src/gradual/reporting/stats.py @@ -0,0 +1,143 @@ +""" +The stats module provides the Stats class which handles collection and processing +of test statistics in the stress testing framework. It implements a singleton +pattern and uses multiprocessing for efficient stats collection and processing. +""" + +from logging import error, getLogger, info +import traceback +from typing import Optional +from gradual.configs.phase import PhaseConfig +from multiprocessing import Event, Process, Queue +from queue import Empty +import time +from gradual.reporting.adapters.base import Adapter +from gradual.reporting.adapters.logging import LoggingAdapter + + +class Stats: + """ + Singleton class for managing test statistics collection and processing. + + This class provides functionality for: + 1. Collecting test statistics in a thread-safe manner + 2. Processing statistics in a separate process + 3. Managing test timing and runtime tracking + 4. Supporting database persistence of statistics + + Attributes: + _instance (Stats): Singleton instance of the Stats class + stop_writing (Event): Event to signal when to stop processing stats + stats_queue (Queue): Queue for passing stats between processes + phase_config (PhaseConfig): Configuration for the current test phase + test_start_time (int): Timestamp when the test started + test_end_time (int): Timestamp when the test ended + write_db_process (Process): Process for handling stats persistence + run_name (str): Name of the current test run + """ + + _instance = None + stop_writing = Event() + + def __init__(self, phase_config: PhaseConfig, run_name: str): + """ + Initialize a new Stats instance. + + Args: + phase_config (PhaseConfig): Configuration for the current test phase + run_name (str): Name of the current test run + """ + self.stats_queue: Queue = Queue() + self.phase_config = phase_config + self.test_start_time: int + self.test_end_time: int + self.write_db_process = Process(target=self.process_stats, args=()) + self.run_name = run_name + + def start_process_stats(self): + """ + Start the process that processes the stats. + """ + self.write_db_process.start() + + def close_process_stats(self): + """ + Terminate the process that processes the stats. + """ + self.write_db_process.terminate() + + def process_stats(self): + """ + Process statistics in a separate process. + + This method runs in a separate process and: + 1. Listens to the stats queue for new statistics + 2. Processes received statistics using the provided adapters + 3. Continues until stop_writing event is set + + Note: + The method uses a timeout of 1 second when waiting for new stats + to allow for graceful shutdown. The timeout is used to avoid + blocking the main process. + """ + while not self.stop_writing.is_set(): + try: + stats, adapters = self.stats_queue.get( + timeout=1 + ) # wait up to 1 sec for a stat to be available + for adapter in adapters: + try: + adapter.process_stats(stats) + except Exception as e: + error( + f"Stat processing failed with {adapter} with exception: {e}" + ) + error(traceback.format_exc()) + except Empty: + # queue is empty. so do nothing + pass + + def persist_stats(self, stats, adapters: Optional[list[Adapter]] = None): + """ + Add statistics to the processing queue. + + Args: + stats: Statistics to be processed and persisted + adapters: Adapters to be used to process the stats. + """ + if adapters is None: + adapters = [LoggingAdapter()] + self.stats_queue.put((stats, adapters)) + + @classmethod + def get_stats_instance(cls): + """ + Get the singleton instance of the Stats class. + + Returns: + Stats: The singleton instance + """ + return cls._instance + + def __new__(cls, *args, **kwargs): + """ + Create or return the singleton instance. + + This method implements the singleton pattern, ensuring only one + instance of the Stats class exists. + + Returns: + Stats: The singleton instance + """ + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def current_runtime(self): + """ + Calculate the current runtime of the test. + + Returns: + float: Time elapsed since test start in seconds + """ + return time.time() - self.test_start_time diff --git a/src/gradual/runners/__init__.py b/src/gradual/runners/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/src/gradual/runners/iterators.py b/src/gradual/runners/iterators.py new file mode 100644 index 0000000..fefc909 --- /dev/null +++ b/src/gradual/runners/iterators.py @@ -0,0 +1,58 @@ +""" +The iterators module provides the RequestIterator class which manages cycling through +different request configurations in a round-robin fashion. This is used to distribute +load across different types of API requests during stress testing. +""" + +from dataclasses import dataclass +from typing import Optional +from gradual.configs.request import RequestConfig + + +@dataclass +class RequestIterator: + """ + Iterator for cycling through different request configurations. + + This class provides a round-robin mechanism to cycle through different types + of API requests during stress testing. It maintains the current position and + provides methods to get the next request configuration. + + Attributes: + request_types (list[RequestConfig]): List of request configurations to cycle through + request_type_index (int): Current index in the request_types list + current (int): Index of the last returned request type, None if no requests have been returned + """ + + request_types: list[RequestConfig] + request_type_index: int = 0 + current: Optional[int] = None + + def get_next_request(self): + """ + Get the next request configuration in the round-robin sequence. + + Returns: + RequestConfig: The next request configuration to use + + Note: + This method cycles through the request_types list in a round-robin fashion, + returning to the beginning when it reaches the end. + """ + self.current = self.request_type_index + request_type = self.request_types[self.request_type_index] + self.request_type_index += 1 + self.request_type_index %= len(self.request_types) + return request_type + + @property + def current_request(self): + """ + Get the current request configuration. + + Returns: + RequestConfig: The current request configuration, or None if no requests have been returned yet + """ + if self.current is not None: + return self.request_types[self.current] + return None diff --git a/src/gradual/runners/phase.py b/src/gradual/runners/phase.py new file mode 100644 index 0000000..a68d266 --- /dev/null +++ b/src/gradual/runners/phase.py @@ -0,0 +1,94 @@ +""" +The phase module provides the Phase class which represents a single test phase +in the stress testing framework. A phase is a self-contained unit of testing that +can be executed independently and has its own configuration and runtime constraints. +""" + +from logging import info + +import gevent + +from gradual.configs.phase import PhaseConfig +from gradual.reporting.stats import Stats +from gradual.runners.runner import Runner + + +class Phase: + """ + Represents a single test phase in the stress testing framework. + + A phase is a self-contained unit of testing that: + 1. Has its own configuration and runtime constraints + 2. Manages its own test runner and reporting + 3. Can be executed independently of other phases + 4. Has timeout protection to prevent indefinite execution + + Attributes: + phase_config (PhaseConfig): Configuration for this test phase + reporting_object (Stats): Statistics and reporting handler for this phase + runner (Runner): Test runner instance that executes the actual tests + """ + + def __init__(self, phase_config: PhaseConfig, run_name: str): + """ + Initialize a new test phase. + + Args: + phase_config (PhaseConfig): Configuration for this test phase + run_name (str): Unique identifier for this test run + """ + # Validation + if not phase_config.name: + raise ValueError("Phase name must not be empty") + if phase_config.phase_runtime is not None and phase_config.phase_runtime < 0: + raise ValueError("Phase runtime must be non-negative") + self.phase_config = phase_config + self.reporting_object = Stats(self.phase_config, run_name) + self.runner = Runner(self.phase_config.scenario_config) + + def execute(self): + """ + Execute the test phase. + + This method: + 1. Spawns a new test runner in a gevent greenlet + 2. Monitors the execution with a timeout + 3. Handles timeout conditions gracefully + 4. Manages the lifecycle of the test execution + """ + info("Starting stats processing...") + self.reporting_object.start_process_stats() + + info(f"Executing phase {self.phase_config.name}") + + start_test_task = gevent.spawn(self.runner.start_test) + + try: + with gevent.Timeout( + self.phase_config.phase_runtime, + TimeoutError("Phase exceeded runtime."), + ): + gevent.wait(objects=[start_test_task]) + info("Phase run complete.") + except TimeoutError: + info("Runtime exceeding. stopping the phase now.") + self.stop_phase() + except Exception: + self.stop_phase() + raise + + info("Closing stats processing...") + self.reporting_object.close_process_stats() + + def stop_phase(self): + """ + Stop the test phase execution. + + This method: + 1. Logs the stopping of the phase + 2. Stops the test runner + 3. Confirms the phase has been stopped + """ + info(f"Stopping Phase {self.phase_config.name}") + self.runner.stop_runner() + info(f"Stopped Phase {self.phase_config.name}") diff --git a/src/gradual/runners/request/Http.py b/src/gradual/runners/request/Http.py new file mode 100644 index 0000000..05e4598 --- /dev/null +++ b/src/gradual/runners/request/Http.py @@ -0,0 +1,251 @@ +""" +The Http module provides the HttpRequest class which implements HTTP-based API requests +for stress testing. It supports various HTTP methods, authentication mechanisms, +and response tracking. +""" + +from gradual.runners.request.base import _Request +from gradual.runners.session import HTTPSession +import gevent +from requests import Response +from gradual.runners.iterators import RequestIterator +from logging import debug, error, warning, info +from time import time_ns +import traceback +import uuid +from gradual.configs.request import RequestConfig + + +class HttpRequest(_Request): + """ + Implementation of HTTP-based API requests for stress testing. + + This class provides functionality for: + 1. Making HTTP requests with different methods (GET, POST, PUT, DELETE) + 2. Supporting Kerberos authentication + 3. Tracking response times and status codes + 4. Managing request sessions and connection pooling + 5. Handling request completion and statistics + + Attributes: + session (HTTPSession): HTTP session for making requests + _kerberos_available (bool): Whether Kerberos support is available + _kerberos_auth (HTTPKerberosAuth or None): Cached Kerberos auth handler + """ + + def __init__( + self, + scenario_name: str, + session: HTTPSession, + run_once: bool, + iterator: RequestIterator, + ): + """ + Initialize a new HTTP request instance. + + Args: + scenario_name (str): Name of the scenario this request belongs to + session (HTTPSession): HTTP session for making requests + run_once (bool): Whether the request should run only once + iterator (RequestIterator): Iterator for cycling through request configurations + """ + super().__init__( + scenario_name=scenario_name, run_once=run_once, iterator=iterator + ) + self.session = session + self._kerberos_available = None + self._kerberos_auth = None + + def _check_kerberos_availability(self): + """ + Check if Kerberos support is available and cache the result. + + Returns: + bool: True if Kerberos support is available, False otherwise + """ + if self._kerberos_available is not None: + return self._kerberos_available + + try: + from requests_kerberos import DISABLED, HTTPKerberosAuth + + self._kerberos_auth = HTTPKerberosAuth(mutual_authentication=DISABLED) + self._kerberos_available = True + info("Kerberos authentication support is available") + return True + except ImportError: + self._kerberos_available = False + warning( + "Kerberos authentication support is not available. " + "Install it with: pip install -e .[kerberos]" + ) + return False + except Exception as e: + self._kerberos_available = False + error(f"Error initializing Kerberos support: {str(e)}") + return False + + def requires_kerberos(self, request_type: RequestConfig) -> bool: + """ + Check if a request requires Kerberos authentication. + + Args: + request_type (RequestConfig): The request configuration to check + + Returns: + bool: True if the request requires Kerberos authentication + """ + return request_type.auth == "kerb" + + def get_kerberos_auth(self): + """ + Get Kerberos authentication handler if available. + + Returns: + HTTPKerberosAuth or None: Kerberos authentication handler if available, + None if Kerberos support is not installed. + """ + if not self._check_kerberos_availability(): + return None + return self._kerberos_auth + + def send_request(self, request_type: RequestConfig, req_kwargs): + """ + Send an HTTP request based on the request configuration. + + This method supports different HTTP methods and handles the actual + request sending using the configured session. + + Args: + request_type (RequestConfig): Configuration for this request + req_kwargs (dict): Keyword arguments for the request + + Returns: + Response: The HTTP response from the server + + Raises: + ValueError: If an unsupported HTTP method is specified + """ + method = request_type.http_method.lower() + + if method == "get": + return self.session.get(**req_kwargs) + + if method == "post": + return self.session.post(**req_kwargs) + + if method == "put": + return self.session.put(**req_kwargs) + + if method == "delete": + return self.session.delete(**req_kwargs) + + else: + raise ValueError("Unsupported HTTP method") + + def on_request_completion( + self, + request_type: RequestConfig, + response: Response, + response_time, + start_time, + end_time, + params, + ): + """ + Handle HTTP request completion and record statistics. + + This method is called after each HTTP request completes to: + 1. Collect response data and timing information + 2. Record statistics for analysis + 3. Log debug information + + Args: + request_type (RequestConfig): Configuration for this request + response (Response): The HTTP response from the server + response_time: Time taken for the request in nanoseconds + start_time: Start time of the request in nanoseconds + end_time: End time of the request in nanoseconds + params (dict): Parameters used in the request + """ + stat_data = { + "request_name": request_type.name, + "url": request_type.url, + "params": params, + "context": request_type.context, + "response_time": response_time, + "status_code": response.status_code, + "start_time": start_time, + "end_time": end_time, + "iid": params["iid"], + "scenario_name": self.scenario_name, + "expected_response_time": request_type.expected_response_time, + } + + debug(stat_data) + self.stats_instance.persist_stats(stat_data) + + def run(self): + """ + Execute the HTTP request in a loop until stopped. + + This method: + 1. Makes HTTP requests in a loop until stop_request is True + 2. Handles request preparation and sending + 3. Tracks timing and response data + 4. Manages authentication and headers + 5. Handles errors and exceptions + 6. Closes the session when done + + Note: + The method will exit after a single request if run_once is True. + """ + while not self.stop_request: + try: + iid = str(uuid.uuid4()) + request_type = self.iterator.get_next_request() + data = request_type.params | {"iid": iid} + req_kwargs = { + "headers": { + "X-ANTICSRF-HEADER": "DESCO", + "Content-type": "application/json", + }, + "json": data, + "url": request_type.url, + } + + # Handle Kerberos authentication + if self.requires_kerberos(request_type): + auth = self.get_kerberos_auth() + if auth: + req_kwargs["auth"] = auth + else: + raise Exception( + f"Skipping request '{request_type.name}' due to missing Kerberos authentication" + ) + + start_time = time_ns() + response = self.send_request( + request_type=request_type, req_kwargs=req_kwargs + ) + end_time = time_ns() + response_time_ns = end_time - start_time + gevent.spawn( + self.on_request_completion, + request_type, + response, + response_time_ns, + start_time, + end_time, + data, + ) + except Exception as e: + error( + f"Error in request '{request_type.name if 'request_type' in locals() else 'unknown'}': {str(e)}" + ) + error(traceback.format_exc()) + + if self.run_once: + self.stop_request = True + break + self.session.close() diff --git a/src/gradual/runners/request/SocketIO.py b/src/gradual/runners/request/SocketIO.py new file mode 100644 index 0000000..0fea75e --- /dev/null +++ b/src/gradual/runners/request/SocketIO.py @@ -0,0 +1,163 @@ +""" +The SocketIO module provides the SocketRequest class which implements WebSocket-based +API requests for stress testing. It supports WebSocket connections, message sending, +and response tracking. +""" + +from gradual.runners.request.base import _Request +from logging import debug, error +import gevent +import json +from time import time_ns +import traceback +import uuid +from websocket import create_connection +from functools import cache +from gradual.configs.request import RequestConfig + + +class SocketRequest(_Request): + """ + Implementation of WebSocket-based API requests for stress testing. + + This class provides functionality for: + 1. Establishing and managing WebSocket connections + 2. Sending messages through WebSocket + 3. Receiving and processing responses + 4. Tracking connection and message timing + 5. Handling connection errors and failures + + Note: + The class uses caching for WebSocket connections to improve performance + and reduce connection overhead. + """ + + def on_request_completion( + self, + request_type: RequestConfig, + response: tuple, + response_time, + start_time, + end_time, + params, + ): + """ + Handle WebSocket request completion and record statistics. + + This method is called after each WebSocket interaction completes to: + 1. Collect response data and timing information + 2. Record statistics for analysis + 3. Log debug information + + Args: + request_type (RequestConfig): Configuration for this request + response (tuple): Tuple containing (status_code, response_message) + response_time: Time taken for the interaction in nanoseconds + start_time: Start time of the interaction in nanoseconds + end_time: End time of the interaction in nanoseconds + params (dict): Parameters used in the message + """ + stat_data = { + "request_name": request_type.name, + "url": request_type.url, + "params": params, + "context": request_type.context, + "response_time": response_time, + "status_code": response[0], + "start_time": start_time, + "end_time": end_time, + "iid": params["iid"], + "scenario_name": self.scenario_name, + "expected_response_time": request_type.expected_response_time, + } + + debug(stat_data) + self.stats_instance.persist_stats(stat_data) + + @cache + def create_ws_connection(url): + """ + Create a WebSocket connection to the specified URL. + + This method is cached to reuse connections and improve performance. + It handles connection errors and provides detailed error logging. + + Args: + url (str): WebSocket server URL to connect to + + Returns: + WebSocket: Established WebSocket connection + + Raises: + Exception: If connection fails, with detailed error information + """ + try: + ws = create_connection(url) + except Exception as e: + error(f"web socket failed to secure a connection with error: {e}") + error(traceback.format_exc()) + raise e + return ws + + def run(self): + """ + Execute the WebSocket request in a loop until stopped. + + This method: + 1. Establishes WebSocket connections + 2. Sends messages in a loop until stop_request is True + 3. Receives and processes responses + 4. Tracks timing and response data + 5. Handles connection errors and failures + 6. Closes connections when done + + Note: + The method will exit after a single interaction if run_once is True. + It handles both successful and failed message sending/receiving scenarios. + """ + while not self.stop_request: + iid = str(uuid.uuid4()) + request_type = self.handler.get_next_request() + ws = self.create_ws_connection(request_type.url) + data = request_type.params | {"iid": iid} + start_time = time_ns() + response_code = 200 + response = None + is_sent = False + try: + ws.send(json.dumps(request_type.params)) + is_sent = True + except Exception as e: + response_code = 503 + error( + f"Failed sending the message through websocket connection with error: {e}" + ) + error(traceback.format_exc()) + + try: + if is_sent: + response = ws.recv() + except Exception as e: + response_code = 500 + error( + f"Failed receiving the message through websocket connection with error: {e}" + ) + error(traceback.format_exc()) + finally: + if response is not None and "Success" not in response: + error(response) + end_time = time_ns() + response_time_ns = end_time - start_time + gevent.spawn( + self.on_request_completion, + request_type, + (response_code, response), + response_time_ns, + start_time, + end_time, + data, + ) + if self.run_once: + self.stop_request = True + break + ws.close() diff --git a/src/gradual/runners/request/__init__.py b/src/gradual/runners/request/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/gradual/runners/request/base.py b/src/gradual/runners/request/base.py new file mode 100644 index 0000000..1cf334d --- /dev/null +++ b/src/gradual/runners/request/base.py @@ -0,0 +1,71 @@ +""" +The base module provides the _Request abstract base class which defines the interface +for all request implementations in the stress testing framework. This class serves as +the foundation for different types of API requests (HTTP, WebSocket, etc.). +""" + +from gradual.reporting.stats import Stats +from gradual.runners.iterators import RequestIterator + + +class _Request: + """ + Abstract base class for all request implementations. + + This class defines the common interface and functionality that all request types + must implement. It provides basic infrastructure for: + 1. Managing request lifecycle + 2. Tracking request statistics + 3. Handling request iteration + 4. Supporting graceful shutdown + + Attributes: + stop_request (bool): Flag to control request execution + stats_instance (Stats): Statistics tracking instance + scenario_name (str): Name of the scenario this request belongs to + run_once (bool): Whether the request should run only once + iterator (RequestIterator): Iterator for cycling through request configurations + """ + + def __init__(self, scenario_name: str, run_once: bool, iterator: RequestIterator): + """ + Initialize a new request instance. + + Args: + scenario_name (str): Name of the scenario this request belongs to + run_once (bool): Whether the request should run only once + iterator (RequestIterator): Iterator for cycling through request configurations + """ + self.stop_request = False + self.stats_instance = Stats.get_stats_instance() + self.scenario_name = scenario_name + self.run_once = run_once + self.iterator = iterator + + def on_request_completion(self, *args, **kwargs): + """ + Handle request completion events. + + This method should be implemented by subclasses to handle specific + completion events for their request type. + + Args: + *args: Variable length argument list + **kwargs: Arbitrary keyword arguments + + Raises: + NotImplementedError: This method must be implemented by subclasses + """ + raise NotImplementedError("Expected subclasses to implement this method.") + + def run(self): + """ + Execute the request. + + This method should be implemented by subclasses to define the specific + execution logic for their request type. + + Raises: + NotImplementedError: This method must be implemented by subclasses + """ + raise NotImplementedError("Expected subclasses to implement this method.") diff --git a/src/gradual/runners/runner.py b/src/gradual/runners/runner.py new file mode 100644 index 0000000..292700d --- /dev/null +++ b/src/gradual/runners/runner.py @@ -0,0 +1,72 @@ +""" +The runner module provides the Runner class which manages the execution of test scenarios. +It coordinates multiple test scenarios running concurrently using gevent for asynchronous execution. +""" + +from logging import info +from time import perf_counter_ns + +import gevent + +from gradual.configs.scenario import ScenarioConfig +from gradual.runners.scenario import Scenario + + +class Runner: + """ + Manages the execution of multiple test scenarios concurrently. + + The Runner is responsible for: + 1. Initializing test scenarios from configurations + 2. Managing concurrent execution of scenarios using gevent + 3. Tracking execution time and running state + 4. Providing graceful shutdown capabilities + + Attributes: + scenarios (list[Scenario]): List of initialized test scenarios + start_counter (int): Nanosecond timestamp when the runner started + running_scenarios (list[Scenario]): List of currently running scenarios + running_scenarios_task (list): List of gevent tasks for running scenarios + """ + + def __init__(self, scenarios: list[ScenarioConfig]): + """ + Initialize the test runner with scenario configurations. + + Args: + scenarios (list[ScenarioConfig]): List of scenario configurations to run + """ + self.scenarios = [Scenario(scenario_config) for scenario_config in scenarios] + self.start_counter = perf_counter_ns() + self.running_scenarios: list[Scenario] = [] + self.running_scenarios_task: list[gevent.Greenlet] = [] + + def start_test(self): + """ + Start the execution of all test scenarios. + + This method: + 1. Spawns a gevent task for each scenario + 2. Tracks running scenarios and their tasks + 3. Waits for all scenarios to complete execution + """ + info("Executing scenarios...") + + for scenario in self.scenarios: + self.running_scenarios.append(scenario) + self.running_scenarios_task.append(gevent.spawn(scenario.execute)) + gevent.wait(self.running_scenarios_task) + + def stop_runner(self): + """ + Stop all running test scenarios gracefully. + + This method: + 1. Signals all running scenarios to stop + 2. Waits for all scenario tasks to complete + 3. Ensures clean shutdown of all test activities + """ + info("Stopping runner.") + for scenario in self.running_scenarios: + scenario.stop_scenario_execution = True + gevent.wait(self.running_scenarios_task) diff --git a/src/gradual/runners/scenario.py b/src/gradual/runners/scenario.py new file mode 100644 index 0000000..a1fb0b0 --- /dev/null +++ b/src/gradual/runners/scenario.py @@ -0,0 +1,235 @@ +""" +The scenario module provides the Scenario class which manages the execution of test requests +within a scenario. It handles concurrency, ramp-up, and different types of requests (HTTP, WebSocket). +""" + +from logging import info +from gradual.configs.scenario import ScenarioConfig +from gradual.constants.request_types import RequestType +from gradual.runners.request.base import _Request +from gradual.runners.request.Http import HttpRequest +from gradual.runners.request.SocketIO import SocketRequest +from gradual.runners.iterators import RequestIterator +from gradual.runners.session import HTTPSession +import gevent +from tabulate import tabulate + + +class Scenario: + """ + Manages the execution of test requests within a scenario. + + The Scenario class is responsible for: + 1. Managing concurrent execution of test requests + 2. Handling ramp-up of concurrent requests + 3. Supporting different types of requests (HTTP, WebSocket) + 4. Managing test sessions and iterators + 5. Providing graceful shutdown capabilities + + Attributes: + scenario_config (ScenarioConfig): Configuration for this test scenario + running_request_tasks (list[gevent.Greenlet]): List of currently running request tasks + last_request_idx (int): Index of the last request type used + stop_scenario_execution (bool): Flag to control scenario execution + requests (list[_Request]): List of request instances + iterator (RequestIterator, optional): Iterator for cycling through request types + """ + + def __init__(self, scenario_config: ScenarioConfig): + """ + Initialize a new test scenario. + + Args: + scenario_config (ScenarioConfig): Configuration for this test scenario + """ + self.scenario_config = scenario_config + self.running_request_tasks: list[gevent.Greenlet] = [] + self.last_request_idx: int = 0 + self.stop_scenario_execution = False + self.requests: list[_Request] = [] + self.iterator = None + if self.scenario_config.iterate_through_requests: + self.iterator = RequestIterator( + request_types=self.scenario_config.request_configs + ) + + def do_ramp_up(self, ramp_up_value): + """ + Increase the number of concurrent requests up to the specified value. + + This method: + 1. Creates new request instances based on the request type + 2. Manages HTTP sessions for HTTP requests + 3. Spawns new gevent tasks for each request + 4. Tracks running requests and their tasks + + Args: + ramp_up_value (int): Target number of concurrent requests to achieve + """ + if self.scenario_config.run_once: + self.running_request_tasks = [] + session = None + current_concurrency = 0 + total_request_configs = len(self.scenario_config.request_configs) + request_type_idx = self.last_request_idx % total_request_configs + + while current_concurrency < ramp_up_value: + current_request_type = self.scenario_config.request_configs[ + request_type_idx + ] + if not self.scenario_config.iterate_through_requests: + iterator = RequestIterator( + request_types=[ + self.scenario_config.request_configs[request_type_idx] + ] + ) + else: + iterator = self.iterator + if current_request_type.type == RequestType.http: + if session is None: + session = HTTPSession( + pool_connections=ramp_up_value, pool_maxsize=ramp_up_value + ) + request = HttpRequest( + scenario_name=self.scenario_config.name, + session=session, + run_once=self.scenario_config.run_once, + iterator=iterator, + ) + elif current_request_type.type == RequestType.websocket: + request = SocketRequest( + scenario_name=self.scenario_config.name, + run_once=self.scenario_config.run_once, + iterator=iterator, + ) + else: + request = _Request( + scenario_name=self.scenario_config.name, + run_once=self.scenario_config.run_once, + iterator=iterator, + ) + self.running_request_tasks.append(gevent.spawn(request.run)) + current_concurrency += 1 + self.requests.append(request) + if len(self.running_request_tasks) >= self.scenario_config.max_concurrency: + return + self.last_request_idx = request_type_idx + + request_type_idx += 1 + request_type_idx %= total_request_configs + + def execute(self): + """ + Execute the test scenario with configured ramp-up behavior. + + This method: + 1. Starts with minimum concurrency + 2. Gradually increases concurrency based on ramp-up configuration + 3. Handles both run-once and continuous execution modes + 4. Manages wait times between ramp-ups + 5. Provides detailed logging of execution progress + """ + info( + f"Starting the testiung with minimum concurrency i.e., {self.scenario_config.min_concurrency}, scenario: {self.scenario_config.name}" + ) + + # Current index of ramp up and ramp up wait array. + ramp_up_idx = 0 + ramp_up_wait_idx = 0 + + # Starting with minimum no. of requests + self.do_ramp_up(self.scenario_config.min_concurrency) + if not self.scenario_config.run_once: + gevent.sleep(self.scenario_config.ramp_up_wait[ramp_up_wait_idx]) + else: + gevent.wait(self.running_request_tasks) + + # Starting request with ramp up and ramp up wait. + while not self.stop_scenario_execution: + # Increasing ramp_up_wait_index + ramp_up_wait_idx += 1 + if ramp_up_wait_idx >= len(self.scenario_config.ramp_up_wait): + ramp_up_wait_idx = len(self.scenario_config.ramp_up_wait) - 1 + + # Calculating by how much we have to ramp up in this iteration. + if self.scenario_config.multiply: + # Suppose we want to ramp up the total requests by 2x and there are already x requests running in an infinite loop. + # Then, total requests need to be added is 2x = already_running_request(x) * (multiplication_facotr(2) -1 ) to make the concurrency 2x. + if not self.scenario_config.run_once: + ramp_up_val = len(self.running_request_tasks) * ( + self.scenario_config.ramp_up[ramp_up_idx] - 1 + ) + + # Suppose we want to ramp up the total requests by 2x and there are already x requests with run_once True. + # That means we are ramping up after the requests are completed. + # Then, total requests needs to be added is 2x = already_running_request(x) * (multiplication_facotr(2)) to make the concurrency 2x. + else: + ramp_up_val = len(self.running_request_tasks) * ( + self.scenario_config.ramp_up[ramp_up_idx] + ) + + else: + if not self.scenario_config.run_once: + ramp_up_val = self.scenario_config.ramp_up[ramp_up_idx] + else: + ramp_up_val = ( + len(self.running_request_tasks) + + self.scenario_config.ramp_up[ramp_up_idx] + ) + # Ramping up by ramp_up nos. + if len(self.running_request_tasks) < self.scenario_config.max_concurrency: + # Logging before ramp_up. + table = [ + [ + len(self.running_request_tasks), + self.scenario_config.name, + ramp_up_val, + ] + ] + info( + tabulate( + table, + headers=[ + "Current no. of requests", + "Scenario Name", + "Next ramp up value", + ], + ) + ) + self.do_ramp_up(ramp_up_value=ramp_up_val) + + if self.stop_scenario_execution: + self.stop_scenario() + break + + if not self.scenario_config.run_once: + # Waiting for ramp_up wait secs before ramping up + gevent.sleep(self.scenario_config.ramp_up_wait[ramp_up_wait_idx]) + else: + # waitng for the running requests to finish. + gevent.wait(self.running_request_tasks) + + # Increasing ramp_up_index + ramp_up_idx += 1 + if ramp_up_idx >= len(self.scenario_config.ramp_up): + ramp_up_idx = len(self.scenario_config.ramp_up) - 1 + + if self.stop_scenario_execution: + self.stop_scenario() + + if len(self.running_request_tasks): + gevent.wait(self.running_request_tasks) + + def stop_scenario(self): + """ + Stop the scenario execution gracefully. + + This method: + 1. Sets the stop flag for the scenario + 2. Signals all running requests to stop + 3. Ensures clean shutdown of all test activities + """ + info(f"Stopping scenario {self.scenario_config.name}") + self.stop_scenario_execution = True + for request in self.requests: + request.stop_request = True diff --git a/src/gradual/runners/session.py b/src/gradual/runners/session.py new file mode 100644 index 0000000..ed34c30 --- /dev/null +++ b/src/gradual/runners/session.py @@ -0,0 +1,46 @@ +""" +The session module provides the HTTPSession class which extends the requests.Session +class to provide connection pooling and reuse for HTTP requests during stress testing. +""" + +from requests import Session +from requests.adapters import HTTPAdapter + + +class HTTPSession(Session): + """ + Custom HTTP session class with connection pooling capabilities. + + This class extends requests.Session to provide efficient connection pooling + for HTTP requests. It configures the underlying HTTPAdapter with specified + pool sizes for both HTTP and HTTPS connections. + + Attributes: + adapter (HTTPAdapter): The configured HTTP adapter with connection pooling + """ + + def __init__(self, pool_connections, pool_maxsize, *args, **kwargs): + """ + Initialize the HTTP session with connection pooling configuration. + + Args: + pool_connections (int): Number of connection pools to maintain + pool_maxsize (int): Maximum number of connections per pool + *args: Additional positional arguments for Session initialization + **kwargs: Additional keyword arguments for Session initialization + """ + super().__init__(*args, **kwargs) + self.adapter = HTTPAdapter( + pool_connections=pool_connections, pool_maxsize=pool_maxsize + ) + self.mount("http://", self.adapter) + self.mount("https://", self.adapter) + + def close(self): + """ + Close the HTTP session and its connection pools. + + This method ensures proper cleanup of all connection pools and resources + associated with this session. + """ + self.adapter.close() diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..c546490 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,108 @@ +""" +Configuration file for pytest that sets up gevent monkey patching. +This ensures all tests run with proper gevent compatibility. +""" + +# Monkey patch must happen before any other imports +import gevent.monkey +import sys + +# Debug logging for monkey patching +print("Python version:", sys.version) +print("Gevent version:", gevent.__version__) + +# Apply monkey patching with explicit modules +gevent.monkey.patch_all( + socket=True, + dns=True, + time=True, + select=True, + thread=True, + os=True, + ssl=True, + subprocess=True, + sys=True, + builtins=True, + signal=True, + queue=True, + contextvars=True, + _threading_local=True +) + +# Now we can safely import other modules +import pytest +import logging +import socket +import threading +import time +import gevent + +# Configure logging to prevent duplicate output +logging.basicConfig(level=logging.INFO) +# Remove any existing handlers to prevent duplicate logging +for handler in logging.root.handlers[:]: + logging.root.removeHandler(handler) +# Add a single handler +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) +logging.root.addHandler(handler) +logger = logging.getLogger(__name__) + +class MonkeyPatchingError(Exception): + """Raised when gevent monkey patching is not properly applied.""" + pass + +def verify_monkey_patching(): + """Verify that gevent monkey patching has been applied correctly. + + Raises: + MonkeyPatchingError: If any required module is not properly patched. + """ + errors = [] + + # Check if socket module is patched + try: + s = socket.socket() + if not isinstance(s, gevent.socket.socket): + errors.append("Socket module is not monkey patched!") + else: + logger.info("Socket module is monkey patched ✓") + except Exception as e: + errors.append(f"Socket module check failed: {e}") + + # Check if threading module is patched + try: + t = threading.Thread() + if not isinstance(t, gevent.threading.Thread): + errors.append("Threading module is not monkey patched!") + else: + logger.info("Threading module is monkey patched ✓") + except Exception as e: + errors.append(f"Threading module check failed: {e}") + + # Check if time module is patched + try: + if time.sleep is not gevent.sleep: + errors.append("Time module is not monkey patched!") + else: + logger.info("Time module is monkey patched ✓") + except Exception as e: + errors.append(f"Time module check failed: {e}") + + if errors: + error_msg = "\n".join(errors) + logger.error(error_msg) + raise MonkeyPatchingError(error_msg) + +# Verify patching immediately +logger.info("Verifying initial monkey patching...") +verify_monkey_patching() + +@pytest.fixture(autouse=True) +def verify_patching_before_test(): + """Verify monkey patching before each test. + + Raises: + MonkeyPatchingError: If any required module is not properly patched. + """ + verify_monkey_patching() \ No newline at end of file diff --git a/tests/test_http.py b/tests/test_http.py new file mode 100644 index 0000000..fd139e7 --- /dev/null +++ b/tests/test_http.py @@ -0,0 +1,234 @@ +""" +Tests for the HTTP request functionality in the stress testing framework. +""" + +import pytest +from unittest.mock import Mock, patch +from requests import Response +from gradual.runners.request.Http import HttpRequest +from gradual.runners.session import HTTPSession +from gradual.runners.iterators import RequestIterator +from gradual.configs.request import RequestConfig + + +def has_kerberos(): + """Check if requests_kerberos is available.""" + try: + import requests_kerberos + + return True + except ImportError: + return False + + +@pytest.fixture +def mock_session(): + """Create a mock HTTPSession.""" + session = Mock(spec=HTTPSession) + session.get = Mock(return_value=Mock(spec=Response)) + session.post = Mock(return_value=Mock(spec=Response)) + session.put = Mock(return_value=Mock(spec=Response)) + session.delete = Mock(return_value=Mock(spec=Response)) + session.close = Mock() + return session + + +@pytest.fixture +def mock_iterator(): + """Create a mock RequestIterator.""" + iterator = Mock(spec=RequestIterator) + iterator.get_next_request = Mock(return_value=Mock(spec=RequestConfig)) + return iterator + + +@pytest.fixture +def http_request(mock_session, mock_iterator): + """Create an HttpRequest instance with mocked dependencies.""" + return HttpRequest( + scenario_name="test_scenario", + session=mock_session, + run_once=True, + iterator=mock_iterator, + ) + + +def test_initialization(http_request): + """Test HttpRequest initialization.""" + assert http_request.scenario_name == "test_scenario" + assert http_request.run_once is True + assert http_request._kerberos_available is None + assert http_request._kerberos_auth is None + + +@pytest.mark.skipif(not has_kerberos(), reason="requests_kerberos not installed") +@pytest.mark.parametrize("auth,expected", [("kerb", True), ("none", False)]) +def test_requires_kerberos(http_request, auth, expected): + """Test Kerberos requirement check.""" + request_type = Mock(spec=RequestConfig) + request_type.auth = auth + assert http_request.requires_kerberos(request_type) is expected + + +@pytest.mark.skipif(not has_kerberos(), reason="requests_kerberos not installed") +@patch("requests_kerberos.HTTPKerberosAuth") +def test_kerberos_available(mock_kerberos_auth, http_request): + """Test when Kerberos is available.""" + mock_auth = Mock() + mock_kerberos_auth.return_value = mock_auth + assert http_request._check_kerberos_availability() is True + assert http_request._kerberos_available is True + assert http_request._kerberos_auth == mock_auth + assert http_request.get_kerberos_auth() == mock_auth + + +@pytest.mark.skipif(not has_kerberos(), reason="requests_kerberos not installed") +@patch("requests_kerberos.HTTPKerberosAuth", side_effect=ImportError) +def test_kerberos_not_available(mock_kerberos_auth, http_request): + """Test when Kerberos is not available.""" + assert http_request._check_kerberos_availability() is False + assert http_request._kerberos_available is False + assert http_request._kerberos_auth is None + assert http_request.get_kerberos_auth() is None + + +@pytest.mark.parametrize( + "method,func", + [ + ("GET", "get"), + ("POST", "post"), + ("PUT", "put"), + ("DELETE", "delete"), + ], +) +def test_send_request_valid_methods(http_request, mock_session, method, func): + """Test sending a request with valid methods.""" + request_type = Mock(spec=RequestConfig) + request_type.http_method = method + req_kwargs = {"url": "http://test.com"} + http_request.send_request(request_type, req_kwargs) + getattr(mock_session, func).assert_called_once_with(**req_kwargs) + + +def test_send_request_invalid_method(http_request): + """Test sending a request with invalid method.""" + request_type = Mock(spec=RequestConfig) + request_type.http_method = "INVALID" + req_kwargs = {"url": "http://test.com"} + with pytest.raises(ValueError, match="Unsupported HTTP method"): + http_request.send_request(request_type, req_kwargs) + + +def test_on_request_completion(http_request): + """Test request completion handling.""" + request_type = Mock(spec=RequestConfig) + request_type.name = "test_request" + request_type.url = "http://test.com" + request_type.context = {} + request_type.expected_response_time = 1000 + response = Mock(spec=Response) + response.status_code = 200 + params = {"iid": "test-id"} + http_request.stats_instance = Mock() + http_request.on_request_completion( + request_type=request_type, + response=response, + response_time=500, + start_time=1000, + end_time=1500, + params=params, + ) + http_request.stats_instance.persist_stats.assert_called_once() + call_args = http_request.stats_instance.persist_stats.call_args[0][0] + print(call_args) + assert call_args["request_name"] == "test_request" + assert call_args["status_code"] == 200 + assert call_args["response_time"] == 500 + + +@pytest.mark.skipif(not has_kerberos(), reason="requests_kerberos not installed") +@patch("gevent.spawn") +def test_run_with_kerberos_available(mock_spawn, http_request, mock_session): + """Test running with Kerberos available.""" + request_type = Mock(spec=RequestConfig) + request_type.name = "test_request" + request_type.auth = "kerb" + request_type.http_method = "GET" + request_type.url = "http://test.com" + request_type.params = {} + http_request.iterator.get_next_request.return_value = request_type + mock_auth = Mock() + with patch("requests_kerberos.HTTPKerberosAuth", return_value=mock_auth): + http_request.run() + mock_session.get.assert_called_once() + call_args = mock_session.get.call_args[1] + assert call_args["auth"] == mock_auth + assert "X-ANTICSRF-HEADER" in call_args["headers"] + assert "Content-type" in call_args["headers"] + + +@pytest.mark.skipif(not has_kerberos(), reason="requests_kerberos not installed") +@patch("gevent.spawn") +def test_run_with_kerberos_unavailable(mock_spawn, http_request, mock_session): + """Test running with Kerberos unavailable.""" + request_type = Mock(spec=RequestConfig) + request_type.name = "test_request" + request_type.auth = "kerb" + request_type.http_method = "GET" + request_type.url = "http://test.com" + request_type.params = {} + http_request.iterator.get_next_request.return_value = request_type + with patch("requests_kerberos.HTTPKerberosAuth", side_effect=ImportError): + http_request.run() + mock_session.get.assert_not_called() + + +@pytest.mark.skipif(not has_kerberos(), reason="requests_kerberos not installed") +@patch("gevent.spawn") +def test_run_without_kerberos(mock_spawn, http_request, mock_session): + """Test running without Kerberos requirement.""" + request_type = Mock(spec=RequestConfig) + request_type.name = "test_request" + request_type.auth = "none" + request_type.http_method = "GET" + request_type.url = "http://test.com" + request_type.params = {} + http_request.iterator.get_next_request.return_value = request_type + http_request.run() + mock_session.get.assert_called_once() + call_args = mock_session.get.call_args[1] + assert "auth" not in call_args + assert "X-ANTICSRF-HEADER" in call_args["headers"] + assert "Content-type" in call_args["headers"] + + +def test_run_with_error(http_request, mock_session): + """Test running with request error.""" + request_type = Mock(spec=RequestConfig) + request_type.name = "test_request" + request_type.http_method = "GET" + request_type.url = "http://test.com" + request_type.params = {} + http_request.iterator.get_next_request.return_value = request_type + mock_session.get.side_effect = Exception("Test error") + http_request.run() + mock_session.close.assert_called_once() + + +@pytest.mark.skipif(not has_kerberos(), reason="requests_kerberos not installed") +@patch("gevent.spawn") +def test_run_with_kerberos_unavailable_logs_error( + mock_spawn, http_request, mock_session, caplog +): + """Test that running with Kerberos unavailable logs an error and skips the request.""" + request_type = Mock(spec=RequestConfig) + request_type.name = "test_request" + request_type.auth = "kerb" + request_type.http_method = "GET" + request_type.url = "http://test.com" + request_type.params = {} + http_request.iterator.get_next_request.return_value = request_type + with patch("requests_kerberos.HTTPKerberosAuth", side_effect=ImportError): + http_request.run() + mock_session.get.assert_not_called() + # Assert the log message is present + assert any("Skipping request" in record.message for record in caplog.records) diff --git a/tests/test_iterators.py b/tests/test_iterators.py new file mode 100644 index 0000000..cd9d52d --- /dev/null +++ b/tests/test_iterators.py @@ -0,0 +1,112 @@ +import pytest +from unittest.mock import Mock +from gradual.runners.iterators import RequestIterator +from gradual.configs.request import RequestConfig + + +@pytest.fixture +def mock_request_configs(): + """Create mock request configs for testing.""" + return [ + Mock(spec=RequestConfig, name="request1"), + Mock(spec=RequestConfig, name="request2"), + Mock(spec=RequestConfig, name="request3"), + ] + + +@pytest.fixture +def iterator(mock_request_configs): + return RequestIterator(request_types=mock_request_configs) + + +def test_iterator_initialization(mock_request_configs): + """Test that RequestIterator initializes correctly with request configs.""" + iterator = RequestIterator(request_types=mock_request_configs) + + assert iterator.request_types == mock_request_configs + assert iterator.request_type_index == 0 + assert iterator.current is None + + +def test_get_next_request(iterator, mock_request_configs): + """Test getting next request in round-robin fashion.""" + # Get first request + request1 = iterator.get_next_request() + assert request1 == mock_request_configs[0] + assert iterator.current == 0 + assert iterator.request_type_index == 1 + + # Get second request + request2 = iterator.get_next_request() + assert request2 == mock_request_configs[1] + assert iterator.current == 1 + assert iterator.request_type_index == 2 + + # Get third request + request3 = iterator.get_next_request() + assert request3 == mock_request_configs[2] + assert iterator.current == 2 + assert iterator.request_type_index == 0 # Should wrap around + + +def test_get_next_request_wraparound(iterator, mock_request_configs): + """Test that iterator wraps around to beginning after last request.""" + # Get all requests + for _ in range(len(mock_request_configs)): + iterator.get_next_request() + + # Next request should be the first one again + request = iterator.get_next_request() + assert request == mock_request_configs[0] + assert iterator.request_type_index == 1 + + +def test_current_request_property(iterator, mock_request_configs): + """Test current_request property behavior.""" + # Initially should be None + assert iterator.current_request is None + + # After getting first request + iterator.get_next_request() + assert iterator.current_request == mock_request_configs[0] + + # After getting second request + iterator.get_next_request() + assert iterator.current_request == mock_request_configs[1] + + +def test_iterator_with_single_request(): + """Test iterator behavior with single request config.""" + single_request = Mock(spec=RequestConfig, name="single_request") + iterator = RequestIterator(request_types=[single_request]) + + # Should always return the same request + for _ in range(3): + request = iterator.get_next_request() + assert request == single_request + + +def test_iterator_with_empty_list(): + """Test iterator behavior with empty request types list.""" + iterator = RequestIterator(request_types=[]) + + with pytest.raises(IndexError): + iterator.get_next_request() + + +def test_iterator_current_index_consistency(iterator, mock_request_configs): + """Test that current index is consistent with returned requests.""" + for i in range(len(mock_request_configs)): + request = iterator.get_next_request() + assert request == mock_request_configs[iterator.current] + + +def test_iterator_multiple_cycles(iterator, mock_request_configs): + """Test iterator behavior over multiple complete cycles.""" + num_cycles = 3 + total_requests = num_cycles * len(mock_request_configs) + + for i in range(total_requests): + request = iterator.get_next_request() + expected_index = i % len(mock_request_configs) + assert request == mock_request_configs[expected_index] diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py new file mode 100644 index 0000000..f70a7c1 --- /dev/null +++ b/tests/test_orchestrator.py @@ -0,0 +1,115 @@ +from logging import info +import pytest +from unittest.mock import Mock, patch +import gevent +from gradual.base.orchestrator import Orchestrator +from gradual.runners.phase import Phase +from gradual.configs.scenario import ScenarioConfig +from gradual.configs.request import RequestConfig +from gradual.configs.phase import PhaseConfig +import time + + +@pytest.fixture +def mock_request_config(): + """Create a mock request config for testing.""" + return RequestConfig( + name="test_request", + params={}, + http_method="GET", + expected_response_time=1.0, + url=None, + type="Something", + ) + + +@pytest.fixture +def mock_scenario_config(mock_request_config): + """Create a mock scenario config for testing.""" + return ScenarioConfig( + name="test_scenario", + min_concurrency=2, + max_concurrency=10, + ramp_up=[2, 3, 4], + ramp_up_wait=[0.001], + multiply=False, + run_once=False, + iterate_through_requests=False, + request_configs=[mock_request_config], + ) + + +@pytest.fixture +def mock_parser(mock_scenario_config): + """Create a mock parser with proper phase configs.""" + # Create the phases list first + phases = [ + PhaseConfig(name="phase1", scenario_config=[mock_scenario_config], runtime=0.1), + PhaseConfig(name="phase2", scenario_config=[mock_scenario_config], runtime=0.1), + ] + # Create a mock with spec_set to properly handle attributes + parser = Mock( + spec_set=["phases", "phase_wait", "run_name", "read_configs", "phase"] + ) + parser.phases = phases + parser.phase_wait = 0.1 + parser.run_name = "test_run" + # Mock the read_configs method to do nothing + parser.read_configs = Mock() + # Mock the phase property to return the phases list + parser.phase = phases + return parser + + +@pytest.fixture +def orchestrator(mock_parser): + with patch("gradual.base.orchestrator.Parser", return_value=mock_parser): + orch = Orchestrator("test_config.json", "request_configs.json") + orch.parser = mock_parser + return orch + + +def test_orchestrator_initialization(): + """Test that Orchestrator initializes correctly with config paths.""" + with patch("gradual.base.orchestrator.Parser") as mock_parser: + orch = Orchestrator("test_config.json", "request_configs.json") + mock_parser.assert_called_once_with("test_config.json", "request_configs.json") + assert orch.test_config_file_path == "test_config.json" + assert orch.request_configs_path == "request_configs.json" + + +def test_start_stress_test(orchestrator, mock_parser): + """Test that stress test execution follows the correct sequence.""" + orchestrator.start_stress_test() + + # Verify phases were created and executed + assert len(orchestrator.parser.phases) == 2 + + +def test_phase_wait_time(orchestrator, mock_parser): + """Test that the correct wait time is applied between phases.""" + start_time = time.time() + orchestrator.start_stress_test() + end_time = time.time() + + # Verify that the total time includes the wait time between phases + # We expect: 2 phases with 0.1 second runtime each + 0.1 second wait between them + expected_min_time = 0.3 # 2 phases + 1 wait + assert end_time - start_time >= expected_min_time + + +def test_phase_execution_order(orchestrator, mock_parser): + """Test that phases are executed in the correct order.""" + phase_execution_order = [] + + def track_phase_execution(self): + phase_execution_order.append(self.phase_config.name) + return None + + # Create a patch for Phase.execute that will be applied to each instance + with patch.object(Phase, "execute", autospec=True) as mock_execute: + mock_execute.side_effect = track_phase_execution + orchestrator.start_stress_test() + + # Verify phases were executed in order + assert phase_execution_order == ["phase1", "phase2"] diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000..0b2fcc6 --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,327 @@ +import pytest +from unittest.mock import patch, mock_open +from gradual.configs.parser import Parser +from gradual.configs.phase import PhaseConfig +from gradual.configs.scenario import ScenarioConfig +from gradual.configs.request import RequestConfig + +# filepath: src/gradual/configs/test_parser.py + + +@pytest.fixture +def mock_yaml_data(): + return { + "runs": { + "name": "test_run", + "wait_between_phases": 2, + "phases": { + "phase1": { + "scenarios": { + "scenario1": { + "min_concurrency": 1, + "max_concurrency": 10, + "ramp_up_add": [1, 2], + "ramp_up_wait": [0.1, 0.2], + "requests": ["request1", "request2"], + } + }, + "run_time": 60, + } + }, + } + } + + +@pytest.fixture +def mock_request_yaml_data(): + return { + "requests": { + "request1": { + "url": "http://example.com", + "params": {"key": "value"}, + "method": "get", + "expected_response_time": 200, + }, + "request2": { + "url": "http://example2.com", + "params": {"key2": "value2"}, + "method": "post", + "expected_response_time": 300, + }, + } + } + + +@patch("builtins.open", new_callable=mock_open) +@patch("yaml.safe_load") +@patch("gradual.configs.parser.assert_not_empty") +def test_read_configs( + mock_assert_not_empty, + mock_yaml_safe_load, + mock_open_file, + mock_yaml_data, + mock_request_yaml_data, +): + # Mock YAML data for test_config_file_path + mock_yaml_safe_load.side_effect = [mock_yaml_data, mock_request_yaml_data] + + parser = Parser("test_config_path.yaml", "request_configs_path.yaml") + parser.read_configs() + + # Assertions for run_name and phase_wait + assert parser.run_name == "test_run" + assert parser.phase_wait == 2 + + # Assertions for phases + assert len(parser.phases) == 1 + phase = parser.phases[0] + assert isinstance(phase, PhaseConfig) + assert phase.name == "phase1" + assert len(phase.scenario_config) == 1 + + # Assertions for scenario_config + scenario = phase.scenario_config[0] + assert isinstance(scenario, ScenarioConfig) + assert scenario.name == "scenario1" + assert scenario.min_concurrency == 1 + assert scenario.max_concurrency == 10 + assert scenario.ramp_up == [1, 2] + assert scenario.ramp_up_wait == [0.1, 0.2] + assert len(scenario.request_configs) == 2 + + # Assertions for request_configs + request1 = scenario.request_configs[0] + assert isinstance(request1, RequestConfig) + assert request1.name == "request1" + assert request1.url == "http://example.com" + assert request1.params == {"key": "value"} + assert request1.http_method == "get" + assert request1.expected_response_time == 200 + + request2 = scenario.request_configs[1] + assert isinstance(request2, RequestConfig) + assert request2.name == "request2" + assert request2.url == "http://example2.com" + assert request2.params == {"key2": "value2"} + assert request2.http_method == "post" + assert request2.expected_response_time == 300 + + # Verify assert_not_empty requests + assert mock_assert_not_empty.call_count > 0 + + +@patch("builtins.open", new_callable=mock_open) +@patch("yaml.safe_load") +@patch("gradual.configs.parser.assert_not_empty") +def test_parser_multiple_phases_and_scenarios( + mock_assert_not_empty, mock_yaml_safe_load, mock_open_file +): + # Mock YAML data for multiple phases and scenarios + mock_yaml_data = { + "runs": { + "name": "multi_run", + "wait_between_phases": 1, + "phases": { + "phase1": { + "scenarios": { + "scenario1": { + "min_concurrency": 1, + "max_concurrency": 5, + "ramp_up_add": [1], + "ramp_up_wait": [0.1], + "requests": ["request1"], + }, + "scenario2": { + "min_concurrency": 2, + "max_concurrency": 10, + "ramp_up_add": [2, 3], + "ramp_up_wait": [0.2, 0.3], + "requests": ["request2"], + }, + }, + "run_time": 30, + }, + "phase2": { + "scenarios": { + "scenario3": { + "min_concurrency": 3, + "max_concurrency": 15, + "ramp_up_add": [4], + "ramp_up_wait": [0.4], + "requests": ["request3"], + } + }, + "run_time": 40, + }, + }, + } + } + mock_request_yaml_data = { + "requests": { + "request1": { + "url": "http://a.com", + "params": {}, + "method": "get", + "expected_response_time": 100, + }, + "request2": { + "url": "http://b.com", + "params": {}, + "method": "post", + "expected_response_time": 200, + }, + "request3": { + "url": "http://c.com", + "params": {}, + "method": "put", + "expected_response_time": 300, + }, + } + } + mock_yaml_safe_load.side_effect = [mock_yaml_data, mock_request_yaml_data] + parser = Parser("test_config_path.yaml", "request_configs_path.yaml") + parser.read_configs() + assert parser.run_name == "multi_run" + assert parser.phase_wait == 1 + assert len(parser.phases) == 2 + phase_names = {s.name for s in parser.phases} + assert phase_names == {"phase1", "phase2"} + # Check scenarios in phase1 + phase1 = next(s for s in parser.phases if s.name == "phase1") + assert len(phase1.scenario_config) == 2 + scenario_names = {c.name for c in phase1.scenario_config} + assert scenario_names == {"scenario1", "scenario2"} + # Check scenarios in phase2 + phase2 = next(s for s in parser.phases if s.name == "phase2") + assert len(phase2.scenario_config) == 1 + assert phase2.scenario_config[0].name == "scenario3" + # Check all scenarios and calls + for scenario in phase1.scenario_config: + for request in scenario.request_configs: + assert request.url.startswith("http://") + + +@patch("builtins.open", new_callable=mock_open) +@patch("yaml.safe_load") +@patch("gradual.configs.parser.assert_not_empty") +def test_parser_missing_required_fields_raises( + mock_assert_not_empty, mock_yaml_safe_load, mock_open_file +): + # Missing 'runs' key + mock_yaml_safe_load.side_effect = [{}, {}] + parser = Parser("test_config_path.yaml", "request_configs_path.yaml") + with pytest.raises(Exception): + parser.read_configs() + + +@patch("builtins.open", new_callable=mock_open) +@patch("yaml.safe_load") +@patch("gradual.configs.parser.assert_not_empty") +def test_parser_invalid_ramp_up_type_raises( + mock_assert_not_empty, mock_yaml_safe_load, mock_open_file +): + # ramp_up_add is a string instead of a list + mock_yaml_data = { + "runs": { + "name": "test_run", + "wait_between_phases": 2, + "phases": { + "phase1": { + "scenarios": { + "scenario1": { + "min_concurrency": 1, + "max_concurrency": 10, + "ramp_up_add": "not_a_list", + "ramp_up_wait": [0.1], + "requests": ["request1"], + } + }, + "run_time": 60, + } + }, + } + } + mock_request_yaml_data = { + "requests": { + "request1": { + "url": "http://example.com", + "params": {}, + "method": "get", + "expected_response_time": 100, + }, + } + } + mock_yaml_safe_load.side_effect = [mock_yaml_data, mock_request_yaml_data] + parser = Parser("test_config_path.yaml", "request_configs_path.yaml") + with pytest.raises(Exception): + parser.read_configs() + + +@patch("builtins.open", new_callable=mock_open) +@patch("yaml.safe_load") +@patch("gradual.configs.parser.assert_not_empty") +def test_parser_optional_fields( + mock_assert_not_empty, mock_yaml_safe_load, mock_open_file +): + mock_yaml_data = { + "runs": { + "name": "test_run", + "wait_between_phases": 2, + "phases": { + "phase1": { + "scenarios": { + "scenario1": { + "min_concurrency": 1, + "max_concurrency": 10, + "ramp_up_multiply": [2, 3], + "ramp_up_wait": [0.1, 0.2], + "requests": ["request1"], + "run_once": True, + "iterate_through_requests": True, + }, + "scenario2": { + "min_concurrency": 2, + "max_concurrency": 5, + "ramp_up_add": [1], + "ramp_up_wait": [0.1], + "requests": ["request2"], + # run_once and iterate_through_requests omitted + }, + }, + "run_time": 60, + } + }, + } + } + mock_request_yaml_data = { + "requests": { + "request1": { + "url": "http://example.com", + "params": {"key": "value"}, + "method": "get", + "expected_response_time": 200, + }, + "request2": { + "url": "http://example2.com", + "params": {"key2": "value2"}, + "method": "post", + "expected_response_time": 300, + }, + } + } + mock_yaml_safe_load.side_effect = [mock_yaml_data, mock_request_yaml_data] + parser = Parser("test_config_path.yaml", "request_configs_path.yaml") + parser.read_configs() + scenarios = parser.phases[0].scenario_config + scenario1 = next(c for c in scenarios if c.name == "scenario1") + scenario2 = next(c for c in scenarios if c.name == "scenario2") + # scenario1: all optional fields present + assert scenario1.multiply is True + assert scenario1.ramp_up == [2, 3] + assert scenario1.run_once is True + assert scenario1.iterate_through_requests is True + # scenario2: optional fields omitted, should use defaults + assert scenario2.multiply is False + assert scenario2.ramp_up == [1] + assert scenario2.run_once is False + assert scenario2.iterate_through_requests is False diff --git a/tests/test_phase.py b/tests/test_phase.py new file mode 100644 index 0000000..6cd8425 --- /dev/null +++ b/tests/test_phase.py @@ -0,0 +1,128 @@ +import pytest +from unittest.mock import Mock, patch +import gevent +from gradual.runners.phase import Phase +from gradual.configs.phase import PhaseConfig + + +@pytest.fixture +def mock_phase_config(): + config = Mock(spec=PhaseConfig) + config.name = "test_phase" + config.phase_runtime = 5 + # Provide a list of mocks with the right spec and attributes + mock_cat_config = Mock() + mock_cat_config.iterate_through_requests = False + mock_cat_config.min_concurrency = 1 + mock_cat_config.max_concurrency = 1 + mock_cat_config.ramp_up = [1] + mock_cat_config.ramp_up_wait = [0.1] + mock_cat_config.multiply = False + mock_cat_config.run_once = False + mock_cat_config.request_configs = [] + config.scenario_config = [mock_cat_config] + return config + + +def test_phase_initialization(mock_phase_config): + """Test that Phase initializes correctly with config and run name.""" + with ( + patch("gradual.runners.phase.Stats") as mock_stats, + patch("gradual.runners.phase.Runner") as mock_runner, + ): + phase = Phase(mock_phase_config, "test_run") + mock_stats.assert_called_once_with(mock_phase_config, "test_run") + mock_runner.assert_called_once_with(mock_phase_config.scenario_config) + assert phase.phase_config == mock_phase_config + assert phase.reporting_object == mock_stats.return_value + assert phase.runner == mock_runner.return_value + + +def test_phase_execution(mock_phase_config): + """Test normal phase execution without timeout.""" + with ( + patch("gradual.runners.phase.Stats"), + patch("gradual.runners.phase.Runner") as mock_runner, + ): + phase = Phase(mock_phase_config, "test_run") + phase.runner.start_test = Mock() + with ( + patch("gevent.spawn") as mock_spawn, + patch("gevent.wait") as mock_wait, + patch("gevent.Timeout") as mock_timeout, + ): + mock_task = Mock() + mock_spawn.return_value = mock_task + phase.execute() + mock_spawn.assert_called_once_with(phase.runner.start_test) + mock_wait.assert_called_once_with(objects=[mock_task]) + args, kwargs = mock_timeout.call_args + assert args[0] == mock_phase_config.phase_runtime + assert isinstance(args[1], TimeoutError) + assert str(args[1]) == "Phase exceeded runtime." + + +def test_phase_timeout(mock_phase_config): + """Test phase execution with timeout.""" + with ( + patch("gradual.runners.phase.Stats"), + patch("gradual.runners.phase.Runner") as mock_runner, + ): + phase = Phase(mock_phase_config, "test_run") + phase.runner.stop_runner = Mock() + with ( + patch("gevent.spawn") as mock_spawn, + patch("gevent.wait", side_effect=TimeoutError()), + patch("gevent.Timeout") as mock_timeout, + ): + mock_task = Mock() + mock_spawn.return_value = mock_task + phase.execute() + phase.runner.stop_runner.assert_called_once() + + +def test_stop_phase(mock_phase_config): + """Test stopping a phase.""" + with ( + patch("gradual.runners.phase.Stats"), + patch("gradual.runners.phase.Runner") as mock_runner, + ): + phase = Phase(mock_phase_config, "test_run") + phase.runner.stop_runner = Mock() + phase.stop_phase() + phase.runner.stop_runner.assert_called_once() + + +def test_phase_execution_error_handling(mock_phase_config): + """Test error handling during phase execution.""" + with ( + patch("gradual.runners.phase.Stats"), + patch("gradual.runners.phase.Runner") as mock_runner, + ): + phase = Phase(mock_phase_config, "test_run") + phase.runner.stop_runner = Mock() + with ( + patch("gevent.spawn") as mock_spawn, + patch("gevent.wait", side_effect=Exception("Test error")), + patch("gevent.Timeout") as mock_timeout, + ): + mock_task = Mock() + mock_spawn.return_value = mock_task + with pytest.raises(Exception) as exc_info: + phase.execute() + assert str(exc_info.value) == "Test error" + phase.runner.stop_runner.assert_called_once() + + +def test_phase_runtime_validation(mock_phase_config): + """Test that phase runtime is properly validated.""" + mock_phase_config.phase_runtime = -1 + with pytest.raises(ValueError): + Phase(mock_phase_config, "test_run") + + +def test_phase_name_validation(mock_phase_config): + """Test that phase name is properly validated.""" + mock_phase_config.name = "" + with pytest.raises(ValueError): + Phase(mock_phase_config, "test_run") diff --git a/tests/test_runner.py b/tests/test_runner.py new file mode 100644 index 0000000..b4cd271 --- /dev/null +++ b/tests/test_runner.py @@ -0,0 +1,176 @@ +import pytest +from unittest.mock import Mock, patch +import gevent +from gradual.runners.runner import Runner +from gradual.configs.scenario import ScenarioConfig + + +@pytest.fixture +def mock_scenario_configs(): + """Create mock scenario configs for testing.""" + configs = [] + for i in range(2): + # Create a mock request config + mock_request_config = Mock() + mock_request_config.name = f"test_request_{i+1}" + mock_request_config.params = {} + mock_request_config.http_method = "GET" + mock_request_config.expected_response_time = 1.0 + mock_request_config.url = None + mock_request_config.type = None + + # Create the scenario config + config = Mock(spec=ScenarioConfig) + config.name = f"scenario{i+1}" + config.iterate_through_requests = False + config.min_concurrency = 1 + config.max_concurrency = 5 + config.ramp_up = [2, 3, 4] + config.ramp_up_wait = [0.1] + config.multiply = False + config.run_once = False + config.request_configs = [mock_request_config] + configs.append(config) + return configs + + +@pytest.fixture +def runner(mock_scenario_configs): + with patch("gradual.runners.runner.Scenario") as mock_scenario: + mock_scenario.return_value = Mock() + runner = Runner(mock_scenario_configs) + return runner + + +def test_runner_initialization(mock_scenario_configs): + """Test that Runner initializes correctly with scenario configs.""" + with patch("gradual.runners.runner.Scenario") as mock_scenario: + mock_scenario.return_value = Mock() + runner = Runner(mock_scenario_configs) + + assert len(runner.scenarios) == len(mock_scenario_configs) + assert len(runner.running_scenarios) == 0 + assert len(runner.running_scenarios_task) == 0 + + +def test_start_test(runner, mock_scenario_configs): + """Test starting test execution for all scenarios.""" + with patch("gevent.spawn") as mock_spawn, patch("gevent.wait") as mock_wait: + + mock_task = Mock() + mock_spawn.return_value = mock_task + + runner.start_test() + + # Verify each scenario was started + assert len(runner.running_scenarios) == len(mock_scenario_configs) + assert len(runner.running_scenarios_task) == len(mock_scenario_configs) + assert mock_spawn.call_count == len(mock_scenario_configs) + mock_wait.assert_called_once_with(runner.running_scenarios_task) + runner.stop_runner() + + +def test_stop_runner(runner, mock_scenario_configs): + """Test stopping all running scenarios.""" + # First start the test to populate running scenarios + with patch("gevent.spawn") as mock_spawn, patch("gevent.wait") as mock_wait: + + mock_task = Mock() + mock_spawn.return_value = mock_task + runner.start_test() + + # Now test stopping + with patch("gevent.wait") as mock_wait: # Mock wait to prevent LoopExit + runner.stop_runner() + + # Verify all scenarios were stopped + for scenario in runner.running_scenarios: + assert scenario.stop_scenario_execution is True + + +def test_runner_error_handling(runner, mock_scenario_configs): + """Test error handling during test execution.""" + with ( + patch("gevent.spawn") as mock_spawn, + patch("gevent.wait", side_effect=Exception("Test error")), + ): + + mock_task = Mock() + mock_spawn.return_value = mock_task + + with pytest.raises(Exception) as exc_info: + runner.start_test() + + assert str(exc_info.value) == "Test error" + + +def test_runner_empty_scenarios(): + """Test runner behavior with empty scenario list.""" + runner = Runner([]) + + assert len(runner.scenarios) == 0 + assert len(runner.running_scenarios) == 0 + assert len(runner.running_scenarios_task) == 0 + + +def test_runner_scenario_execution_order(runner, mock_scenario_configs): + """Test that scenarios are executed in the correct order.""" + execution_order = [] + + def mock_execute(): + execution_order.append(len(execution_order) + 1) + + # Mock the request class to prevent NotImplementedError + class MockRequest: + def __init__(self, *args, **kwargs): + pass + + def run(self): + pass + + # Mock the iterator class + class MockIterator: + def __init__(self, *args, **kwargs): + pass + + def get_next_request(self): + return Mock() + + with ( + patch("gevent.spawn") as mock_spawn, + patch("gevent.wait") as mock_wait, + patch("gradual.runners.runner.Scenario") as mock_scenario, + patch("gradual.runners.scenario._Request", MockRequest), + patch("gradual.runners.scenario.RequestIterator", MockIterator), + ): + # Create a new mock for each scenario + mock_instances = [] + for _ in mock_scenario_configs: + mock_instance = Mock() + mock_instance.execute.side_effect = mock_execute + mock_instances.append(mock_instance) + + mock_scenario.side_effect = mock_instances + + # Set up spawn to actually requests the execute function + def spawn_side_effect(func): + func() + return Mock() + + mock_spawn.side_effect = spawn_side_effect + + runner = Runner(mock_scenario_configs) + runner.start_test() + + # Verify scenarios were executed in order + assert execution_order == [1, 2] + runner.stop_runner() + + +def test_runner_stop_before_start(): + """Test stopping runner before starting any tests.""" + runner = Runner([]) + runner.stop_runner() # Should not raise any errors + + assert len(runner.running_scenarios) == 0 + assert len(runner.running_scenarios_task) == 0 diff --git a/tests/test_scenario.py b/tests/test_scenario.py new file mode 100644 index 0000000..9a52b91 --- /dev/null +++ b/tests/test_scenario.py @@ -0,0 +1,228 @@ +import pytest +import gevent +from gradual.configs.scenario import ScenarioConfig +from gradual.configs.request import RequestConfig +from gradual.constants.request_types import RequestType +from gradual.runners.iterators import RequestIterator +from gradual.runners.scenario import Scenario +from gradual.runners.request.base import _Request +from gradual.runners.request.Http import HttpRequest +from gradual.runners.request.SocketIO import SocketRequest + + +@pytest.fixture +def scenario_config(): + """Create a basic scenario config for testing.""" + # Create request config with invalid type to trigger implementation error + request_config = RequestConfig( + name="somerequest", + params={}, + http_method="GET", + expected_response_time=1.0, + url=None, + type="Something", + ) + + return ScenarioConfig( + name="test_scenario", + min_concurrency=2, + max_concurrency=10, + ramp_up=[2, 3, 4], + ramp_up_wait=[0.001], # 1ms wait + multiply=False, + run_once=False, + iterate_through_requests=False, + request_configs=[request_config], + ) + + +def test_do_ramp_up(scenario_config): + """Test ramp up with requests.""" + scenario = Scenario(scenario_config) + + scenario.do_ramp_up(2) + + # Wait for tasks to complete + gevent.wait(scenario.running_request_tasks) + + # Check that all tasks failed with NotImplementedError + for task in scenario.running_request_tasks: + assert isinstance(task.exception, NotImplementedError) + assert str(task.exception) == "Expected subclasses to implement this method." + + # Verify the requests were made correctly + assert len(scenario.running_request_tasks) == 2 + assert len(scenario.requests) == 2 + assert all(isinstance(request, _Request) for request in scenario.requests) + assert not any( + isinstance(request, (HttpRequest, SocketRequest)) + for request in scenario.requests + ) + + +def test_do_ramp_up_max_concurrency(scenario_config): + """Test ramp up respects max concurrency limit.""" + scenario_config.max_concurrency = 3 + + scenario = Scenario(scenario_config) + + scenario.do_ramp_up(5) # Try to ramp up more than max_concurrency + + # Wait for tasks to complete + gevent.wait(scenario.running_request_tasks) + + assert len(scenario.running_request_tasks) <= scenario_config.max_concurrency + + +def test_do_ramp_up_with_iteration(scenario_config): + """Test ramp up when iterate_through_requests is True.""" + scenario_config.iterate_through_requests = True + + scenario = Scenario(scenario_config) + + scenario.do_ramp_up(2) + + # Wait for tasks to complete + gevent.wait(scenario.running_request_tasks) + + assert len(scenario.running_request_tasks) == 2 + assert len(scenario.requests) == 2 + + +def test_do_ramp_up_with_multiple_requests(scenario_config): + """Test ramp up with multiple request configs to verify iterator behavior.""" + # Create multiple request configs + request_configs = [ + RequestConfig( + name="request1", + params={}, + http_method="GET", + expected_response_time=1.0, + url=None, + type="Something", + ), + RequestConfig( + name="request2", + params={}, + http_method="GET", + expected_response_time=1.0, + url=None, + type="Something", + ), + RequestConfig( + name="request3", + params={}, + http_method="GET", + expected_response_time=1.0, + url=None, + type="Something", + ), + ] + + # Update scenario config with multiple requests + scenario_config.request_configs = request_configs + scenario_config.iterate_through_requests = True + + scenario = Scenario(scenario_config) + + # Ramp up to create more requests than we have request configs + scenario.do_ramp_up(5) + + # Wait for tasks to complete + gevent.wait(scenario.running_request_tasks) + + # Check that all tasks failed with NotImplementedError + for task in scenario.running_request_tasks: + assert isinstance(task.exception, NotImplementedError) + assert str(task.exception) == "Expected subclasses to implement this method." + + # Verify the requests were made correctly + assert len(scenario.running_request_tasks) == 5 + assert len(scenario.requests) == 5 + + # Verify that requests were created in the correct order + # The first 3 requests should match our request configs in order + for i in range(3): + assert scenario.requests[i].iterator.get_next_request().name == f"request{i+1}" + + # The last 2 requests should cycle back to the beginning + assert scenario.requests[3].iterator.get_next_request().name == "request1" + assert scenario.requests[4].iterator.get_next_request().name == "request2" + + # Verify all requests are base _Request instances + assert all(isinstance(request, _Request) for request in scenario.requests) + assert not any( + isinstance(request, (HttpRequest, SocketRequest)) + for request in scenario.requests + ) + + +def test_do_ramp_up_with_multiple_requests_no_iteration(scenario_config): + """Test ramp up with multiple request configs when iterate_through_requests is False.""" + # Create multiple request configs + request_configs = [ + RequestConfig( + name="request1", + params={}, + http_method="GET", + expected_response_time=1.0, + url=None, + type="Something", + ), + RequestConfig( + name="request2", + params={}, + http_method="GET", + expected_response_time=1.0, + url=None, + type="Something", + ), + RequestConfig( + name="request3", + params={}, + http_method="GET", + expected_response_time=1.0, + url=None, + type="Something", + ), + ] + + # Update scenario config with multiple requests + scenario_config.request_configs = request_configs + scenario_config.iterate_through_requests = False + + scenario = Scenario(scenario_config) + + # Ramp up to create more requests than we have request configs + scenario.do_ramp_up(5) + + # Wait for tasks to complete + gevent.wait(scenario.running_request_tasks) + + # Check that all tasks failed with NotImplementedError + for task in scenario.running_request_tasks: + assert isinstance(task.exception, NotImplementedError) + assert str(task.exception) == "Expected subclasses to implement this method." + + # Verify the requests were made correctly + assert len(scenario.running_request_tasks) == 5 + assert len(scenario.requests) == 5 + + # Verify that each request has its own iterator with a single request config + # The first 3 requests should match our request configs in order + for i in range(3): + assert len(scenario.requests[i].iterator.request_types) == 1 + assert scenario.requests[i].iterator.request_types[0].name == f"request{i+1}" + + # The last 2 requests should cycle back to the beginning + assert len(scenario.requests[3].iterator.request_types) == 1 + assert scenario.requests[3].iterator.request_types[0].name == "request1" + assert len(scenario.requests[4].iterator.request_types) == 1 + assert scenario.requests[4].iterator.request_types[0].name == "request2" + + # Verify all requests are base _Request instances + assert all(isinstance(request, _Request) for request in scenario.requests) + assert not any( + isinstance(request, (HttpRequest, SocketRequest)) + for request in scenario.requests + ) diff --git a/tests/test_session.py b/tests/test_session.py new file mode 100644 index 0000000..f49babe --- /dev/null +++ b/tests/test_session.py @@ -0,0 +1,76 @@ +import pytest +from unittest.mock import Mock, patch +from gradual.runners.session import HTTPSession +from requests.adapters import HTTPAdapter + + +@pytest.fixture +def session(): + return HTTPSession(pool_connections=10, pool_maxsize=20) + + +def test_session_initialization(): + """Test that HTTPSession initializes correctly with pool settings.""" + session = HTTPSession(pool_connections=10, pool_maxsize=20) + + assert isinstance(session.adapter, HTTPAdapter) + assert session.adapter._pool_connections == 10 + assert session.adapter._pool_maxsize == 20 + + +def test_session_mounts(): + """Test that session mounts adapters for both HTTP and HTTPS.""" + session = HTTPSession(pool_connections=10, pool_maxsize=20) + + assert session.adapters["http://"] is session.adapter + assert session.adapters["https://"] is session.adapter + + +def test_session_close(): + """Test that session close properly cleans up resources.""" + with patch.object(HTTPAdapter, "close") as mock_close: + session = HTTPSession(pool_connections=10, pool_maxsize=20) + session.close() + + mock_close.assert_called_once() + + +def test_session_inheritance(): + """Test that HTTPSession properly inherits from requests.Session.""" + session = HTTPSession(pool_connections=10, pool_maxsize=20) + + from requests import Session + + assert isinstance(session, Session) + + +def test_session_pool_limits(): + """Test that pool limits are properly enforced.""" + session = HTTPSession(pool_connections=5, pool_maxsize=10) + + assert session.adapter._pool_connections <= 5 + assert session.adapter._pool_maxsize <= 10 + + +def test_session_adapter_reuse(): + """Test that the same adapter is used for both HTTP and HTTPS.""" + session = HTTPSession(pool_connections=10, pool_maxsize=20) + + assert session.adapters["http://"] is session.adapters["https://"] + + +def test_session_with_additional_args(): + """Test session initialization with additional arguments.""" + session = HTTPSession(pool_connections=10, pool_maxsize=20) + session.headers.update({"User-Agent": "Test"}) + assert session.headers["User-Agent"] == "Test" + + +def test_session_close_multiple_times(): + """Test that closing session multiple times is safe.""" + with patch.object(HTTPAdapter, "close") as mock_close: + session = HTTPSession(pool_connections=10, pool_maxsize=20) + session.close() + session.close() + + assert mock_close.call_count == 2