From b7129a529a65c13bb5aee15d80573e7a91a33f37 Mon Sep 17 00:00:00 2001 From: kasimlyee Date: Fri, 9 Jan 2026 09:20:25 +0300 Subject: [PATCH 01/15] Setup for tests --- .coveragerc | 35 ++++++ .github/workflows/tests.yml | 106 +++++++++++++++++ Tests/README.md | 219 ++++++++++++++++++++++++++++++++++++ Tests/conftest.py | 58 ++++++++++ pyproject.toml | 46 +++++++- pytest.ini | 39 +++++++ run_tests.bat | 51 +++++++++ run_tests.sh | 47 ++++++++ 8 files changed, 599 insertions(+), 2 deletions(-) create mode 100644 .coveragerc create mode 100644 .github/workflows/tests.yml create mode 100644 Tests/README.md create mode 100644 Tests/conftest.py create mode 100644 pytest.ini create mode 100644 run_tests.bat create mode 100644 run_tests.sh diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..3b71d7d --- /dev/null +++ b/.coveragerc @@ -0,0 +1,35 @@ +[run] +source = jsweb +branch = True +omit = + */tests/* + */test_*.py + */__pycache__/* + */venv/* + */.venv/* + setup.py + +[report] +precision = 2 +show_missing = True +skip_covered = False +exclude_lines = + pragma: no cover + def __repr__ + raise AssertionError + raise NotImplementedError + if __name__ == .__main__.: + if TYPE_CHECKING: + if typing.TYPE_CHECKING: + @abstractmethod + @abc.abstractmethod + pass + ... + except ImportError: + except KeyboardInterrupt: + +[html] +directory = htmlcov + +[xml] +output = coverage.xml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..071cc66 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,106 @@ +name: Tests + +on: + push: + branches: + - main + - develop + pull_request: + branches: + - main + - develop + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + fail-fast: false + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip setuptools wheel + pip install -e ".[dev]" + + - name: Run pytest with coverage + run: | + pytest Tests/ -v --cov=jsweb --cov-report=xml --cov-report=html --cov-report=term-missing + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + files: ./coverage.xml + flags: unittests + name: codecov-umbrella + fail_ci_if_error: false + verbose: true + + - name: Archive coverage reports + if: always() + uses: actions/upload-artifact@v3 + with: + name: coverage-report-py${{ matrix.python-version }} + path: htmlcov/ + + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + cache: "pip" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[dev]" + + - name: Run black (code formatting check) + run: black --check jsweb Tests + + - name: Run isort (import sorting check) + run: isort --check-only jsweb Tests + + - name: Run flake8 (linting) + run: flake8 jsweb Tests + + - name: Run mypy (type checking) + run: mypy jsweb --ignore-missing-imports --no-error-summary 2>/dev/null || true + + security: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + cache: "pip" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[dev]" + + - name: Run bandit (security scan) + run: bandit -r jsweb -f json -o bandit-report.json || true + + - name: Run safety (dependency check) + run: safety check --json || true diff --git a/Tests/README.md b/Tests/README.md new file mode 100644 index 0000000..44e026c --- /dev/null +++ b/Tests/README.md @@ -0,0 +1,219 @@ +# Pytest Setup and CI/CD Integration + +## Local Testing + +### Installation + +Install development dependencies including pytest: + +```bash +pip install -e ".[dev]" +``` + +### Running Tests + +Run all tests: + +```bash +pytest +``` + +Run tests with coverage report: + +```bash +pytest --cov=jsweb --cov-report=html +``` + +Run specific test file: + +```bash +pytest Tests/test_routing.py -v +``` + +Run tests with specific marker: + +```bash +pytest -m unit +pytest -m integration +pytest -m slow +``` + +Run tests matching a pattern: + +```bash +pytest -k "test_form" -v +``` + +### Available Test Markers + +- `@pytest.mark.unit` - Unit tests +- `@pytest.mark.integration` - Integration tests +- `@pytest.mark.slow` - Slow running tests +- `@pytest.mark.asyncio` - Async tests +- `@pytest.mark.forms` - Form validation tests +- `@pytest.mark.routing` - Routing tests +- `@pytest.mark.database` - Database tests +- `@pytest.mark.security` - Security tests + +### Coverage Reports + +After running tests with `--cov`, view the HTML coverage report: + +```bash +# On Windows +start htmlcov/index.html + +# On Linux/Mac +open htmlcov/index.html +``` + +## CI/CD Integration + +### GitHub Actions Workflow + +The project includes a GitHub Actions workflow (`.github/workflows/tests.yml`) that: + +1. **Tests Job** - Runs tests on multiple Python versions (3.8-3.12) + - Installs dependencies + - Runs pytest with coverage + - Uploads coverage to Codecov + - Archives coverage reports as artifacts + +2. **Lint Job** - Checks code quality + - Black (code formatting) + - isort (import sorting) + - Flake8 (linting) + - MyPy (type checking) + +3. **Security Job** - Scans for security issues + - Bandit (security analysis) + - Safety (dependency vulnerabilities) + +### Workflow Triggers + +The workflow runs automatically on: + +- Push to `main` and `develop` branches +- Pull requests to `main` and `develop` branches + +### Codecov Integration + +Coverage reports are automatically uploaded to Codecov. Add a `CODECOV_TOKEN` secret in your GitHub repository settings for authenticated uploads (optional). + +## Configuration Files + +### pytest.ini + +Main pytest configuration file with: +- Test discovery patterns +- Output options +- Test markers +- Asyncio mode settings + +### pyproject.toml + +Contains additional pytest and coverage configuration: +- `[tool.pytest.ini_options]` - Pytest settings +- `[tool.coverage.run]` - Coverage collection settings +- `[tool.coverage.report]` - Coverage report options + +### .coveragerc + +Detailed coverage configuration: +- Source paths +- Files to omit +- Excluded lines +- Report formats + +## Pre-commit Hooks + +To run tests and linting before commits, set up pre-commit: + +```bash +pre-commit install +pre-commit run --all-files +``` + +The `.pre-commit-config.yaml` should include pytest and other linting tools. + +## Tips for Writing Tests + +### Basic Test Structure + +```python +import pytest +from jsweb import App + +@pytest.mark.unit +def test_app_creation(): + """Test that an app can be created.""" + app = App(__name__) + assert app is not None +``` + +### Using Fixtures + +```python +@pytest.mark.unit +def test_with_app(app): + """Test using the app fixture.""" + assert app is not None + +@pytest.mark.unit +def test_with_config(config): + """Test using the config fixture.""" + assert config.TESTING is True +``` + +### Async Tests + +```python +@pytest.mark.asyncio +async def test_async_operation(): + """Test async code.""" + result = await some_async_function() + assert result is not None +``` + +### Parametrized Tests + +```python +@pytest.mark.parametrize("input,expected", [ + ("test", "test"), + ("TEST", "test"), + ("Test", "test"), +]) +def test_string_lowercase(input, expected): + """Test string lowercasing with multiple inputs.""" + assert input.lower() == expected +``` + +## Troubleshooting + +### ImportError: No module named 'jsweb' + +Install the package in development mode: + +```bash +pip install -e . +``` + +### Coverage not showing results + +Make sure to use: + +```bash +pytest --cov=jsweb +``` + +### Tests not being discovered + +Check that test files follow the pattern: `test_*.py` and test functions start with `test_` + +### Async test issues + +Ensure pytest-asyncio is installed: + +```bash +pip install pytest-asyncio +``` diff --git a/Tests/conftest.py b/Tests/conftest.py new file mode 100644 index 0000000..7b86d5b --- /dev/null +++ b/Tests/conftest.py @@ -0,0 +1,58 @@ +"""Pytest configuration and shared fixtures for jsweb tests.""" + +import sys +from pathlib import Path + +import pytest + +# Add the parent directory to the path so we can import jsweb +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +@pytest.fixture +def app(): + """Create a basic jsweb application for testing.""" + from jsweb import App + + app = App(__name__) + return app + + +@pytest.fixture +def client(app): + """Create a test client for the app.""" + # This is a simple implementation - you may need to adjust based on your app + return app + + +@pytest.fixture +def config(): + """Provide a test configuration.""" + class TestConfig: + DEBUG = True + TESTING = True + SECRET_KEY = "test-secret-key" + DATABASE_URL = "sqlite:///:memory:" + + return TestConfig() + + +@pytest.fixture +def sample_form_data(): + """Provide sample form data for testing.""" + return { + "username": "testuser", + "email": "test@example.com", + "password": "testpass123", + } + + +@pytest.fixture +def sample_json_data(): + """Provide sample JSON data for testing.""" + return { + "name": "Test User", + "email": "test@example.com", + "age": 30, + "active": True + } diff --git a/pyproject.toml b/pyproject.toml index 0272c18..a9193da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,8 +76,8 @@ jsweb = "jsweb.cli:cli" [project.urls] Documentation = "https://jsweb-framework.site/" -Homepage = "https://github.com/Jones-peter/jsweb" -"Bug Tracker" = "https://github.com/Jones-peter/jsweb/issues" +Homepage = "https://github.com/Jsweb-Tech/jsweb" +"Bug Tracker" = "https://github.com/Jsweb-Tech/jsweb/issues" [tool.setuptools.packages.find] where = ["."] @@ -100,3 +100,45 @@ jsweb = [ "admin/templates/*.html", "admin/static/*" ] + +[tool.pytest.ini_options] +testpaths = ["Tests"] +python_files = "test_*.py" +python_classes = "Test*" +python_functions = "test_*" +addopts = "-v --strict-markers --tb=short --cov=jsweb --cov-report=html --cov-report=term-missing" +markers = [ + "unit: Unit tests", + "integration: Integration tests", + "slow: Tests that take a long time to run", + "asyncio: Async tests" +] +asyncio_mode = "auto" + +[tool.coverage.run] +source = ["jsweb"] +omit = [ + "*/tests/*", + "*/test_*.py", + "*/__pycache__/*", + "*/venv/*", + "*/.venv/*" +] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "raise AssertionError", + "raise NotImplementedError", + "if __name__ == .__main__.:", + "if TYPE_CHECKING:", + "if typing.TYPE_CHECKING:", + "@abstractmethod" +] +precision = 2 +skip_covered = false +skip_empty = true + +[tool.coverage.html] +directory = "htmlcov" diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..9b6c056 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,39 @@ +[pytest] +# Pytest configuration file for jsweb + +# Test paths +testpaths = Tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* + +# Output options +addopts = + -v + --strict-markers + --tb=short + --cov=jsweb + --cov-report=html + --cov-report=term-missing + --cov-report=xml + -ra + +# Markers for categorizing tests +markers = + unit: Unit tests (mark with @pytest.mark.unit) + integration: Integration tests (mark with @pytest.mark.integration) + slow: Tests that take a long time to run (mark with @pytest.mark.slow) + asyncio: Async tests (mark with @pytest.mark.asyncio) + forms: Form validation tests + routing: Routing tests + database: Database tests + security: Security tests + +# Asyncio configuration +asyncio_mode = auto + +# Timeout for tests (seconds) +timeout = 300 + +# Minimum Python version +minversion = 7.0 diff --git a/run_tests.bat b/run_tests.bat new file mode 100644 index 0000000..a760a1b --- /dev/null +++ b/run_tests.bat @@ -0,0 +1,51 @@ +@echo off +REM Quick test runner script for jsweb (Windows) + +setlocal enabledelayedexpansion + +echo. +echo ================================================ +echo JsWeb Test Suite +echo ================================================ +echo. + +REM Check if virtual environment exists +if not exist "venv\" ( + if not exist ".venv\" ( + echo Creating virtual environment... + python -m venv venv + call venv\Scripts\activate.bat + python -m pip install --upgrade pip + ) else ( + call .venv\Scripts\activate.bat + ) +) else ( + call venv\Scripts\activate.bat +) + +REM Install dependencies +echo Installing dependencies... +pip install -e ".[dev]" + +REM Run tests +echo. +echo Running tests... +echo. +pytest Tests/ -v --cov=jsweb --cov-report=html --cov-report=term-missing + +REM Check exit code +if %ERRORLEVEL% EQU 0 ( + echo. + echo Tests passed! + echo Coverage report generated in htmlcov\index.html +) else ( + echo. + echo Tests failed! + exit /b 1 +) + +echo. +echo ================================================ +echo. + +pause diff --git a/run_tests.sh b/run_tests.sh new file mode 100644 index 0000000..0e095ee --- /dev/null +++ b/run_tests.sh @@ -0,0 +1,47 @@ +#!/bin/bash +# Quick test runner script for jsweb + +set -e + +echo "================================================" +echo "JsWeb Test Suite" +echo "================================================" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# Check if virtual environment exists +if [ ! -d "venv" ] && [ ! -d ".venv" ]; then + echo -e "${YELLOW}Virtual environment not found. Creating one...${NC}" + python -m venv venv + source venv/bin/activate + pip install --upgrade pip +else + if [ -d ".venv" ]; then + source .venv/bin/activate + else + source venv/bin/activate + fi +fi + +# Install dependencies +echo -e "${YELLOW}Installing dependencies...${NC}" +pip install -e ".[dev]" + +# Run tests +echo -e "${YELLOW}Running tests...${NC}" +pytest Tests/ -v --cov=jsweb --cov-report=html --cov-report=term-missing + +# Check exit code +if [ $? -eq 0 ]; then + echo -e "${GREEN}Tests passed!${NC}" + echo -e "${GREEN}Coverage report generated in htmlcov/index.html${NC}" +else + echo -e "${RED}Tests failed!${NC}" + exit 1 +fi + +echo "================================================" From 3f197f8cd3c395c11337ef610ccdca98f655e466 Mon Sep 17 00:00:00 2001 From: kasimlyee Date: Fri, 9 Jan 2026 09:33:49 +0300 Subject: [PATCH 02/15] tests added with run scripts --- README.md | 2 +- Tests/{README.md => TESTS_GUIDE.md} | 0 run_tests.bat => scripts/run_tests.bat | 0 run_tests.sh => scripts/run_tests.sh | 0 4 files changed, 1 insertion(+), 1 deletion(-) rename Tests/{README.md => TESTS_GUIDE.md} (100%) rename run_tests.bat => scripts/run_tests.bat (100%) rename run_tests.sh => scripts/run_tests.sh (100%) diff --git a/README.md b/README.md index 3fdb98f..76b1f15 100644 --- a/README.md +++ b/README.md @@ -363,7 +363,7 @@ This project is licensed under the **MIT License** - see [LICENSE](LICENSE) file ---

- Made with ❤️ by the JsWeb team
+ Made and Maintained by the JsWeb team
Join our Discord communitySponsor us

diff --git a/Tests/README.md b/Tests/TESTS_GUIDE.md similarity index 100% rename from Tests/README.md rename to Tests/TESTS_GUIDE.md diff --git a/run_tests.bat b/scripts/run_tests.bat similarity index 100% rename from run_tests.bat rename to scripts/run_tests.bat diff --git a/run_tests.sh b/scripts/run_tests.sh similarity index 100% rename from run_tests.sh rename to scripts/run_tests.sh From 5e42da0906cc767b34b40645729ba32bd152ab9b Mon Sep 17 00:00:00 2001 From: kasimlyee Date: Fri, 9 Jan 2026 10:16:57 +0300 Subject: [PATCH 03/15] Added more tests files --- Tests/conftest.py | 113 ++++++++ Tests/test_authentication.py | 370 +++++++++++++++++++++++++ Tests/test_csrf_json.py | 124 --------- Tests/test_database.py | 358 ++++++++++++++++++++++++ Tests/test_features.py | 213 +++++++++++++++ Tests/test_forms.py | 364 ++++++++++++++++++++++++ Tests/test_framework_comparison.py | 392 -------------------------- Tests/test_middleware.py | 349 +++++++++++++++++++++++ Tests/test_new_features.py | 128 --------- Tests/test_optimized_routing.py | 32 --- Tests/test_performance.py | 241 ++++++++++++++++ Tests/test_request_response.py | 426 +++++++++++++++++++++++++++++ Tests/test_routing.py | 256 +++++++++++++++++ Tests/test_routing_comparison.py | 155 ----------- Tests/test_routing_optimized.py | 139 ---------- Tests/test_routing_scale.py | 126 --------- Tests/test_security.py | 311 +++++++++++++++++++++ 17 files changed, 3001 insertions(+), 1096 deletions(-) create mode 100644 Tests/test_authentication.py delete mode 100644 Tests/test_csrf_json.py create mode 100644 Tests/test_database.py create mode 100644 Tests/test_features.py create mode 100644 Tests/test_forms.py delete mode 100644 Tests/test_framework_comparison.py create mode 100644 Tests/test_middleware.py delete mode 100644 Tests/test_new_features.py delete mode 100644 Tests/test_optimized_routing.py create mode 100644 Tests/test_performance.py create mode 100644 Tests/test_request_response.py create mode 100644 Tests/test_routing.py delete mode 100644 Tests/test_routing_comparison.py delete mode 100644 Tests/test_routing_optimized.py delete mode 100644 Tests/test_routing_scale.py create mode 100644 Tests/test_security.py diff --git a/Tests/conftest.py b/Tests/conftest.py index 7b86d5b..2b8d043 100644 --- a/Tests/conftest.py +++ b/Tests/conftest.py @@ -2,6 +2,7 @@ import sys from pathlib import Path +from io import BytesIO import pytest @@ -15,6 +16,7 @@ def app(): from jsweb import App app = App(__name__) + app.config.TESTING = True return app @@ -33,6 +35,7 @@ class TestConfig: TESTING = True SECRET_KEY = "test-secret-key" DATABASE_URL = "sqlite:///:memory:" + SQLALCHEMY_ECHO = False return TestConfig() @@ -56,3 +59,113 @@ def sample_json_data(): "age": 30, "active": True } + + +@pytest.fixture +def fake_environ(): + """Provide a fake WSGI environ dict for request testing.""" + def _make_environ( + method='GET', + path='/', + query_string='', + content_type='application/x-www-form-urlencoded', + content_length=0, + body=b'', + cookies='' + ): + return { + 'REQUEST_METHOD': method, + 'CONTENT_TYPE': content_type, + 'CONTENT_LENGTH': str(content_length), + 'PATH_INFO': path, + 'QUERY_STRING': query_string, + 'HTTP_COOKIE': cookies, + 'wsgi.input': BytesIO(body), + 'SERVER_NAME': 'testserver', + 'SERVER_PORT': '80', + 'wsgi.url_scheme': 'http', + } + + return _make_environ + + +@pytest.fixture +def json_request_environ(fake_environ): + """Create a JSON POST request environ.""" + import json + + data = {"key": "value", "number": 42} + body = json.dumps(data).encode('utf-8') + + return fake_environ( + method='POST', + path='/api/test', + content_type='application/json', + content_length=len(body), + body=body + ) + + +@pytest.fixture +def form_request_environ(fake_environ): + """Create a form POST request environ.""" + body = b'username=testuser&email=test@example.com' + + return fake_environ( + method='POST', + path='/form', + content_type='application/x-www-form-urlencoded', + content_length=len(body), + body=body + ) + + +@pytest.fixture +def file_upload_environ(fake_environ): + """Create a file upload request environ.""" + boundary = '----WebKitFormBoundary' + body = ( + f'--{boundary}\r\n' + f'Content-Disposition: form-data; name="file"; filename="test.txt"\r\n' + f'Content-Type: text/plain\r\n' + f'\r\n' + f'test file content\r\n' + f'--{boundary}--\r\n' + ).encode('utf-8') + + return fake_environ( + method='POST', + path='/upload', + content_type=f'multipart/form-data; boundary={boundary}', + content_length=len(body), + body=body + ) + + +# Markers configuration +def pytest_configure(config): + """Configure custom pytest markers.""" + config.addinivalue_line( + "markers", "unit: Unit tests that test individual components" + ) + config.addinivalue_line( + "markers", "integration: Integration tests that test multiple components together" + ) + config.addinivalue_line( + "markers", "slow: Tests that take a long time to run" + ) + config.addinivalue_line( + "markers", "asyncio: Async tests" + ) + config.addinivalue_line( + "markers", "forms: Form validation tests" + ) + config.addinivalue_line( + "markers", "routing: Routing tests" + ) + config.addinivalue_line( + "markers", "database: Database tests" + ) + config.addinivalue_line( + "markers", "security: Security tests" + ) diff --git a/Tests/test_authentication.py b/Tests/test_authentication.py new file mode 100644 index 0000000..7cae2af --- /dev/null +++ b/Tests/test_authentication.py @@ -0,0 +1,370 @@ +"""Tests for JsWeb authentication and user management.""" + +import pytest + + +@pytest.mark.unit +def test_user_model(): + """Test basic user model.""" + try: + from sqlalchemy import Column, Integer, String + from sqlalchemy.orm import declarative_base + + Base = declarative_base() + + class User(Base): + __tablename__ = 'users' + id = Column(Integer, primary_key=True) + username = Column(String(80), unique=True, nullable=False) + email = Column(String(120), unique=True, nullable=False) + password_hash = Column(String(255), nullable=False) + is_active = Column(Integer, default=1) + + assert User is not None + assert hasattr(User, 'username') + assert hasattr(User, 'email') + assert hasattr(User, 'password_hash') + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +def test_user_authentication(): + """Test user authentication workflow.""" + try: + from jsweb.security import hash_password, check_password + + password = "secure_password_123" + hashed = hash_password(password) + + # Correct password + assert check_password(password, hashed) + + # Wrong password + assert not check_password("wrong_password", hashed) + except ImportError: + pytest.skip("Password hashing not available") + + +@pytest.mark.unit +def test_session_management(): + """Test session creation and management.""" + try: + from jsweb.security import generate_session_token + + token = generate_session_token() + assert token is not None + assert len(token) >= 32 + except ImportError: + pytest.skip("Session management not available") + + +@pytest.mark.unit +def test_login_attempt_tracking(): + """Test login attempt tracking.""" + # Basic test structure for login attempt tracking + class LoginAttempt: + def __init__(self, user_id, success=False): + self.user_id = user_id + self.success = success + self.attempts = 0 + + def increment(self): + self.attempts += 1 + + def reset(self): + self.attempts = 0 + + attempt = LoginAttempt(user_id=1) + assert attempt.attempts == 0 + + attempt.increment() + assert attempt.attempts == 1 + + +@pytest.mark.unit +def test_password_reset_token(): + """Test password reset token generation.""" + try: + from jsweb.security import generate_secure_token + + reset_token = generate_secure_token() + assert reset_token is not None + assert len(reset_token) >= 32 + except ImportError: + pytest.skip("Token generation not available") + + +@pytest.mark.unit +def test_email_verification(): + """Test email verification token.""" + try: + from jsweb.security import generate_secure_token + + verification_token = generate_secure_token() + assert verification_token is not None + except ImportError: + pytest.skip("Token generation not available") + + +@pytest.mark.unit +def test_two_factor_authentication_setup(): + """Test 2FA setup.""" + # Basic 2FA structure + class TwoFactorAuth: + def __init__(self, user_id): + self.user_id = user_id + self.enabled = False + self.secret = None + + def enable(self, secret): + self.enabled = True + self.secret = secret + + def disable(self): + self.enabled = False + self.secret = None + + mfa = TwoFactorAuth(user_id=1) + assert not mfa.enabled + + mfa.enable(secret="secret123") + assert mfa.enabled + assert mfa.secret == "secret123" + + +@pytest.mark.unit +def test_permission_system(): + """Test permission-based access control.""" + class Permission: + def __init__(self, name, description=""): + self.name = name + self.description = description + + class Role: + def __init__(self, name): + self.name = name + self.permissions = [] + + def add_permission(self, permission): + self.permissions.append(permission) + + def has_permission(self, permission_name): + return any(p.name == permission_name for p in self.permissions) + + admin_role = Role("Admin") + read_perm = Permission("read") + write_perm = Permission("write") + delete_perm = Permission("delete") + + admin_role.add_permission(read_perm) + admin_role.add_permission(write_perm) + admin_role.add_permission(delete_perm) + + assert admin_role.has_permission("read") + assert admin_role.has_permission("write") + assert admin_role.has_permission("delete") + assert not admin_role.has_permission("admin") + + +@pytest.mark.unit +def test_user_roles(): + """Test user role assignment.""" + class User: + def __init__(self, username): + self.username = username + self.roles = [] + + def add_role(self, role): + if role not in self.roles: + self.roles.append(role) + + def remove_role(self, role): + if role in self.roles: + self.roles.remove(role) + + def has_role(self, role_name): + return any(r == role_name for r in self.roles) + + user = User("john_doe") + user.add_role("user") + + assert user.has_role("user") + assert not user.has_role("admin") + + user.add_role("admin") + assert user.has_role("admin") + + +@pytest.mark.unit +def test_authentication_middleware(): + """Test authentication middleware basics.""" + class AuthMiddleware: + def __init__(self): + self.authenticated_users = {} + + def authenticate(self, username, token): + if username in self.authenticated_users: + return self.authenticated_users[username] == token + return False + + def login(self, username, token): + self.authenticated_users[username] = token + + def logout(self, username): + if username in self.authenticated_users: + del self.authenticated_users[username] + + middleware = AuthMiddleware() + assert not middleware.authenticate("user1", "token1") + + middleware.login("user1", "token1") + assert middleware.authenticate("user1", "token1") + + middleware.logout("user1") + assert not middleware.authenticate("user1", "token1") + + +@pytest.mark.unit +def test_jwt_token_support(): + """Test JWT token support (if available).""" + try: + import jwt + from datetime import datetime, timedelta + + secret = "test-secret" + payload = { + 'user_id': 1, + 'username': 'john', + 'exp': datetime.utcnow() + timedelta(hours=1) + } + + token = jwt.encode(payload, secret, algorithm='HS256') + assert token is not None + + decoded = jwt.decode(token, secret, algorithms=['HS256']) + assert decoded['user_id'] == 1 + assert decoded['username'] == 'john' + except ImportError: + pytest.skip("PyJWT not available") + + +@pytest.mark.unit +def test_session_timeout(): + """Test session timeout functionality.""" + from datetime import datetime, timedelta + + class Session: + def __init__(self, timeout_seconds=3600): + self.created_at = datetime.utcnow() + self.timeout_seconds = timeout_seconds + + def is_expired(self): + elapsed = (datetime.utcnow() - self.created_at).total_seconds() + return elapsed > self.timeout_seconds + + def remaining_time(self): + elapsed = (datetime.utcnow() - self.created_at).total_seconds() + remaining = self.timeout_seconds - elapsed + return max(0, remaining) + + session = Session(timeout_seconds=3600) + assert not session.is_expired() + assert session.remaining_time() > 0 + + +@pytest.mark.unit +def test_password_reset_flow(): + """Test password reset workflow.""" + try: + from jsweb.security import hash_password, generate_secure_token + + # Step 1: Generate reset token + reset_token = generate_secure_token() + assert reset_token is not None + + # Step 2: Hash new password + new_password = "new_secure_password_123" + new_hash = hash_password(new_password) + assert new_hash is not None + + # Step 3: Update password (simulated) + # password_hash = new_hash + + except ImportError: + pytest.skip("Security utilities not available") + + +@pytest.mark.unit +def test_account_lockout(): + """Test account lockout after failed attempts.""" + class Account: + def __init__(self, max_attempts=5): + self.failed_attempts = 0 + self.max_attempts = max_attempts + self.is_locked = False + + def failed_login(self): + self.failed_attempts += 1 + if self.failed_attempts >= self.max_attempts: + self.is_locked = True + + def reset_attempts(self): + self.failed_attempts = 0 + self.is_locked = False + + account = Account(max_attempts=3) + assert not account.is_locked + + account.failed_login() + account.failed_login() + account.failed_login() + + assert account.is_locked + assert account.failed_attempts == 3 + + +@pytest.mark.unit +def test_social_authentication(): + """Test social authentication provider integration.""" + class SocialAuth: + def __init__(self, provider): + self.provider = provider + self.oauth_token = None + + def get_auth_url(self): + return f"https://{self.provider}/oauth/authorize" + + def set_token(self, token): + self.oauth_token = token + + google_auth = SocialAuth("google.com") + assert google_auth.provider == "google.com" + assert google_auth.get_auth_url() == "https://google.com/oauth/authorize" + + +@pytest.mark.unit +def test_user_profile(): + """Test user profile management.""" + class UserProfile: + def __init__(self, user_id): + self.user_id = user_id + self.bio = "" + self.avatar_url = None + self.preferences = {} + + def update_bio(self, bio): + self.bio = bio + + def set_preference(self, key, value): + self.preferences[key] = value + + def get_preference(self, key, default=None): + return self.preferences.get(key, default) + + profile = UserProfile(user_id=1) + profile.update_bio("Software developer") + profile.set_preference("theme", "dark") + + assert profile.bio == "Software developer" + assert profile.get_preference("theme") == "dark" diff --git a/Tests/test_csrf_json.py b/Tests/test_csrf_json.py deleted file mode 100644 index f3d7dff..0000000 --- a/Tests/test_csrf_json.py +++ /dev/null @@ -1,124 +0,0 @@ -import asyncio -import httpx -import subprocess -import sys -import time -import os - -# Construct absolute path to the test application directory -TESTS_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_APP_DIR = os.path.join(TESTS_DIR, "test") - -# Ensure the test application is in the python path -sys.path.insert(0, TEST_APP_DIR) - -BASE_URL = "http://127.0.0.1:8000" - -async def run_csrf_test(): - """ - Tests that CSRF protection works correctly for various request types. - """ - print("--- Starting CSRF Logic Test ---") - async with httpx.AsyncClient(base_url=BASE_URL) as client: - try: - # 1. Make a GET request to a page to get a CSRF token from the cookie - print("Step 1: Getting CSRF token from homepage...") - get_response = await client.get("/") - get_response.raise_for_status() - assert "csrf_token" in client.cookies, "CSRF token not found in cookie" - csrf_token = client.cookies["csrf_token"] - print(f" [PASS] CSRF token received: {csrf_token[:10]}...") - - # 2. Test POST without any CSRF token (should fail) - print("\nStep 2: Testing POST to /api/test without CSRF token (expecting 403)...") - fail_response = await client.post("/api/test", json={"message": "hello"}) - assert fail_response.status_code == 403, f"Expected status 403, but got {fail_response.status_code}" - assert "CSRF token missing or invalid" in fail_response.text - print(" [PASS] Request was correctly forbidden.") - - # 3. Test POST with CSRF token in JSON body (should pass) - print("\nStep 3: Testing POST to /api/test with CSRF token in JSON body (expecting 200)...") - payload_with_token = {"message": "hello", "csrf_token": csrf_token} - success_response_body = await client.post("/api/test", json=payload_with_token) - assert success_response_body.status_code == 200, f"Expected status 200, but got {success_response_body.status_code}" - assert success_response_body.json()["message"] == "hello" - print(" [PASS] Request with token in body was successful.") - - # 4. Test POST with CSRF token in header (should pass) - print("\nStep 4: Testing POST to /api/test with CSRF token in header (expecting 200)...") - headers = {"X-CSRF-Token": csrf_token} - success_response_header = await client.post("/api/test", json={"message": "world"}, headers=headers) - assert success_response_header.status_code == 200, f"Expected status 200, but got {success_response_header.status_code}" - assert success_response_header.json()["message"] == "world" - print(" [PASS] Request with token in header was successful.") - - # 5. Test empty-body POST with CSRF token in header (should pass validation, then redirect) - print("\nStep 5: Testing empty-body POST to /logout with CSRF token in header (expecting 302)...") - # Note: The /logout endpoint redirects after success, so we expect a 302 - # We disable auto-redirects to verify the 302 status directly - empty_body_response = await client.post("/logout", headers=headers, follow_redirects=False) - - # If we got a 403, the CSRF check failed. If we got a 302, it passed! - assert empty_body_response.status_code == 302, f"Expected status 302 (Redirect), but got {empty_body_response.status_code}. (403 means CSRF failed)" - print(" [PASS] Empty-body request passed CSRF check and redirected.") - - except Exception as e: - print(f"\n--- TEST FAILED ---") - print(f"An error occurred: {e}") - import traceback - traceback.print_exc() - return False - - print("\n--- ALL CSRF TESTS PASSED ---") - return True - - -def main(): - print("Starting test server...") - server_process = subprocess.Popen( - [sys.executable, "-m", "uvicorn", "app:app"], - cwd=TEST_APP_DIR, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, # Decode stdout/stderr as text - ) - - # Give the server more time to start up - print("Waiting 5 seconds for server to start...") - time.sleep(5) - - # Check if the server process has terminated unexpectedly - if server_process.poll() is not None: - print("\n--- SERVER FAILED TO START ---") - stdout, stderr = server_process.communicate() - print("STDOUT:") - print(stdout) - print("\nSTDERR:") - print(stderr) - sys.exit(1) - - print("Server seems to be running. Starting tests.") - test_passed = False - try: - test_passed = asyncio.run(run_csrf_test()) - finally: - print("\nStopping test server...") - server_process.terminate() - # Get remaining output - try: - stdout, stderr = server_process.communicate(timeout=5) - print("\n--- Server Output ---") - print("STDOUT:") - print(stdout) - print("\nSTDERR:") - print(stderr) - except subprocess.TimeoutExpired: - print("Server did not terminate gracefully.") - - if not test_passed: - print("\nExiting with status 1 due to test failure.") - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/Tests/test_database.py b/Tests/test_database.py new file mode 100644 index 0000000..be7c0e8 --- /dev/null +++ b/Tests/test_database.py @@ -0,0 +1,358 @@ +"""Tests for JsWeb database and ORM functionality.""" + +import pytest + + +@pytest.mark.unit +@pytest.mark.database +def test_database_connection(): + """Test database connection initialization.""" + try: + from jsweb.database import Database + + db = Database('sqlite:///:memory:') + assert db is not None + except (ImportError, TypeError): + pytest.skip("Database class not available or requires setup") + + +@pytest.mark.unit +@pytest.mark.database +def test_sqlalchemy_import(): + """Test that SQLAlchemy is available.""" + from sqlalchemy import create_engine, Column, Integer, String + + assert create_engine is not None + assert Column is not None + + +@pytest.mark.unit +@pytest.mark.database +def test_model_definition(): + """Test model definition.""" + try: + from sqlalchemy import Column, Integer, String + from sqlalchemy.orm import declarative_base + + Base = declarative_base() + + class User(Base): + __tablename__ = 'users' + id = Column(Integer, primary_key=True) + username = Column(String(80), unique=True, nullable=False) + email = Column(String(120), unique=True, nullable=False) + + assert User is not None + assert hasattr(User, '__tablename__') + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_model_relationships(): + """Test model relationship definitions.""" + try: + from sqlalchemy import Column, Integer, String, ForeignKey + from sqlalchemy.orm import declarative_base, relationship + + Base = declarative_base() + + class Author(Base): + __tablename__ = 'authors' + id = Column(Integer, primary_key=True) + name = Column(String(100)) + + class Book(Base): + __tablename__ = 'books' + id = Column(Integer, primary_key=True) + title = Column(String(100)) + author_id = Column(Integer, ForeignKey('authors.id')) + author = relationship("Author") + + assert Book is not None + assert hasattr(Book, 'author') + except ImportError: + pytest.skip("SQLAlchemy relationships not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_database_session(): + """Test database session creation.""" + try: + from sqlalchemy import create_engine + from sqlalchemy.orm import sessionmaker + + engine = create_engine('sqlite:///:memory:') + Session = sessionmaker(bind=engine) + session = Session() + + assert session is not None + assert hasattr(session, 'query') + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_model_validation(): + """Test model field validation.""" + try: + from sqlalchemy import Column, Integer, String, CheckConstraint + from sqlalchemy.orm import declarative_base + + Base = declarative_base() + + class Product(Base): + __tablename__ = 'products' + id = Column(Integer, primary_key=True) + name = Column(String(100), nullable=False) + price = Column(Integer) + + assert Product is not None + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_migration_support(): + """Test that Alembic is available for migrations.""" + try: + from alembic import command + from alembic.config import Config + + assert command is not None + assert Config is not None + except ImportError: + pytest.skip("Alembic not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_model_inheritance(): + """Test model inheritance.""" + try: + from sqlalchemy import Column, Integer, String + from sqlalchemy.orm import declarative_base + + Base = declarative_base() + + class BaseModel(Base): + __abstract__ = True + id = Column(Integer, primary_key=True) + + class User(BaseModel): + __tablename__ = 'users' + username = Column(String(80)) + + assert User is not None + assert hasattr(User, 'id') + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_model_indexes(): + """Test model field indexing.""" + try: + from sqlalchemy import Column, Integer, String, Index + from sqlalchemy.orm import declarative_base + + Base = declarative_base() + + class User(Base): + __tablename__ = 'users' + id = Column(Integer, primary_key=True) + email = Column(String(120), index=True) + + assert User is not None + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_model_constraints(): + """Test unique constraints.""" + try: + from sqlalchemy import Column, Integer, String, UniqueConstraint + from sqlalchemy.orm import declarative_base + + Base = declarative_base() + + class User(Base): + __tablename__ = 'users' + id = Column(Integer, primary_key=True) + username = Column(String(80), unique=True) + email = Column(String(120), unique=True) + + assert User is not None + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_model_default_values(): + """Test model default values.""" + try: + from sqlalchemy import Column, Integer, String, DateTime + from sqlalchemy.orm import declarative_base + from datetime import datetime + + Base = declarative_base() + + class Post(Base): + __tablename__ = 'posts' + id = Column(Integer, primary_key=True) + title = Column(String(100)) + created_at = Column(DateTime, default=datetime.utcnow) + + assert Post is not None + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_nullable_fields(): + """Test nullable field configuration.""" + try: + from sqlalchemy import Column, Integer, String + from sqlalchemy.orm import declarative_base + + Base = declarative_base() + + class User(Base): + __tablename__ = 'users' + id = Column(Integer, primary_key=True) + username = Column(String(80), nullable=False) + phone = Column(String(20), nullable=True) + + assert User is not None + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_model_repr(): + """Test model string representation.""" + try: + from sqlalchemy import Column, Integer, String + from sqlalchemy.orm import declarative_base + + Base = declarative_base() + + class User(Base): + __tablename__ = 'users' + id = Column(Integer, primary_key=True) + username = Column(String(80)) + + def __repr__(self): + return f"" + + assert User is not None + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_enum_field(): + """Test enum field type.""" + try: + from sqlalchemy import Column, Integer, String, Enum + from sqlalchemy.orm import declarative_base + import enum + + Base = declarative_base() + + class UserRole(enum.Enum): + ADMIN = 'admin' + USER = 'user' + GUEST = 'guest' + + class User(Base): + __tablename__ = 'users' + id = Column(Integer, primary_key=True) + role = Column(Enum(UserRole)) + + assert User is not None + except ImportError: + pytest.skip("SQLAlchemy Enum not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_json_field(): + """Test JSON field type.""" + try: + from sqlalchemy import Column, Integer, JSON + from sqlalchemy.orm import declarative_base + + Base = declarative_base() + + class User(Base): + __tablename__ = 'users' + id = Column(Integer, primary_key=True) + extra_data = Column(JSON) + + assert User is not None + except ImportError: + pytest.skip("SQLAlchemy JSON type not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_text_field(): + """Test large text field.""" + try: + from sqlalchemy import Column, Integer, Text + from sqlalchemy.orm import declarative_base + + Base = declarative_base() + + class BlogPost(Base): + __tablename__ = 'blog_posts' + id = Column(Integer, primary_key=True) + content = Column(Text) + + assert BlogPost is not None + except ImportError: + pytest.skip("SQLAlchemy not available") + + +@pytest.mark.unit +@pytest.mark.database +def test_many_to_many_relationship(): + """Test many-to-many relationship.""" + try: + from sqlalchemy import Column, Integer, String, ForeignKey, Table + from sqlalchemy.orm import declarative_base, relationship + + Base = declarative_base() + + # Association table + user_roles = Table('user_roles', Base.metadata, + Column('user_id', Integer, ForeignKey('users.id')), + Column('role_id', Integer, ForeignKey('roles.id')) + ) + + class User(Base): + __tablename__ = 'users' + id = Column(Integer, primary_key=True) + roles = relationship("Role", secondary=user_roles) + + class Role(Base): + __tablename__ = 'roles' + id = Column(Integer, primary_key=True) + name = Column(String(50)) + + assert User is not None + assert Role is not None + except ImportError: + pytest.skip("SQLAlchemy not available") diff --git a/Tests/test_features.py b/Tests/test_features.py new file mode 100644 index 0000000..9b90259 --- /dev/null +++ b/Tests/test_features.py @@ -0,0 +1,213 @@ +"""Tests for new JsWeb features (JSON parsing, file uploads, validators).""" + +import json +import pytest +from io import BytesIO + + +@pytest.mark.unit +def test_import_new_features(): + """Test that all new features can be imported.""" + from jsweb import UploadedFile, FileField, FileRequired, FileAllowed, FileSize + + assert UploadedFile is not None + assert FileField is not None + assert FileRequired is not None + assert FileAllowed is not None + assert FileSize is not None + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_json_request_parsing(): + """Test JSON request body parsing.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + body = json.dumps({'name': 'Alice', 'email': 'alice@example.com'}) + content = body.encode('utf-8') + + app = FakeApp() + scope = { + 'type': 'http', + 'method': 'POST', + 'path': '/', + 'query_string': b'', + 'headers': [(b'content-type', b'application/json')], + } + + async def receive(): + return {'body': content, 'more_body': False} + + req = Request(scope, receive, app) + data = await req.json() + + assert data == {'name': 'Alice', 'email': 'alice@example.com'} + assert data['name'] == 'Alice' + assert data['email'] == 'alice@example.com' + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_json_parsing_with_numbers(): + """Test JSON parsing with various data types.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + body = json.dumps({'count': 42, 'active': True, 'items': [1, 2, 3]}) + content = body.encode('utf-8') + + app = FakeApp() + scope = { + 'type': 'http', + 'method': 'POST', + 'path': '/', + 'query_string': b'', + 'headers': [(b'content-type', b'application/json')], + } + + async def receive(): + return {'body': content, 'more_body': False} + + req = Request(scope, receive, app) + data = await req.json() + + assert data['count'] == 42 + assert data['active'] is True + assert data['items'] == [1, 2, 3] + + +@pytest.mark.unit +def test_filefield_creation(): + """Test FileField creation in forms.""" + from jsweb.forms import Form, FileField + from jsweb.validators import FileRequired, FileAllowed, FileSize + + class TestForm(Form): + upload = FileField('Upload File', validators=[ + FileRequired(), + FileAllowed(['jpg', 'png']), + FileSize(max_size=1024*1024) # 1MB + ]) + + form = TestForm() + assert form is not None + assert hasattr(form, 'upload') + assert len(form.upload.validators) == 3 + validator_names = [v.__class__.__name__ for v in form.upload.validators] + assert 'FileRequired' in validator_names + assert 'FileAllowed' in validator_names + assert 'FileSize' in validator_names + + +@pytest.mark.unit +def test_fileallowed_validator_accepts_valid_extensions(): + """Test that FileAllowed validator accepts valid file extensions.""" + from jsweb.validators import FileAllowed + + class MockFile: + def __init__(self, filename): + self.filename = filename + + class MockField: + def __init__(self, filename): + self.data = MockFile(filename) + + validator = FileAllowed(['jpg', 'png', 'gif']) + + # Should not raise for valid extensions + field = MockField('test.jpg') + validator(None, field) # Should not raise + + field = MockField('image.png') + validator(None, field) # Should not raise + + +@pytest.mark.unit +def test_fileallowed_validator_rejects_invalid_extensions(): + """Test that FileAllowed validator rejects invalid file extensions.""" + from jsweb.validators import FileAllowed, ValidationError + + class MockFile: + def __init__(self, filename): + self.filename = filename + + class MockField: + def __init__(self, filename): + self.data = MockFile(filename) + + validator = FileAllowed(['jpg', 'png']) + field = MockField('script.exe') + + with pytest.raises(ValidationError): + validator(None, field) + + +@pytest.mark.unit +def test_filesize_validator_accepts_small_files(): + """Test that FileSize validator accepts files within size limit.""" + from jsweb.validators import FileSize + + class MockFile: + def __init__(self, size): + self.size = size + + class MockField: + def __init__(self, size): + self.data = MockFile(size) + + validator = FileSize(max_size=1000) + + # Should not raise for small files + field = MockField(500) + validator(None, field) # Should not raise + + field = MockField(1000) # Exactly at limit + validator(None, field) # Should not raise + + +@pytest.mark.unit +def test_filesize_validator_rejects_large_files(): + """Test that FileSize validator rejects files exceeding size limit.""" + from jsweb.validators import FileSize, ValidationError + + class MockFile: + def __init__(self, size): + self.size = size + + class MockField: + def __init__(self, size): + self.data = MockFile(size) + + validator = FileSize(max_size=1000) + field = MockField(2000) + + with pytest.raises(ValidationError): + validator(None, field) + + +@pytest.mark.unit +def test_filerequired_validator(): + """Test FileRequired validator.""" + from jsweb.validators import FileRequired, ValidationError + + class MockField: + def __init__(self, data): + self.data = data + + validator = FileRequired() + + # Should raise when no file provided + field = MockField(None) + with pytest.raises(ValidationError): + validator(None, field) + + # Should not raise when file provided + field = MockField("dummy_file") + validator(None, field) # Should not raise diff --git a/Tests/test_forms.py b/Tests/test_forms.py new file mode 100644 index 0000000..739efa0 --- /dev/null +++ b/Tests/test_forms.py @@ -0,0 +1,364 @@ +"""Tests for JsWeb forms and validation system.""" + +import pytest +from io import BytesIO + + +@pytest.mark.unit +@pytest.mark.forms +def test_form_creation(): + """Test basic form creation.""" + from jsweb.forms import Form, StringField + + class TestForm(Form): + username = StringField('Username') + + form = TestForm() + assert form is not None + assert hasattr(form, 'username') + + +@pytest.mark.unit +@pytest.mark.forms +def test_stringfield_creation(): + """Test StringField creation.""" + from jsweb.forms import Form, StringField + + class TestForm(Form): + email = StringField('Email') + + form = TestForm() + assert form.email is not None + # Label is an object, not a string + assert hasattr(form.email, 'label') or hasattr(form.email, 'name') + + +@pytest.mark.unit +@pytest.mark.forms +def test_form_with_validators(): + """Test form with validators.""" + from jsweb.forms import Form, StringField + from jsweb.validators import DataRequired, Email + + class LoginForm(Form): + email = StringField('Email', validators=[DataRequired(), Email()]) + password = StringField('Password', validators=[DataRequired()]) + + form = LoginForm() + assert len(form.email.validators) >= 2 + assert len(form.password.validators) >= 1 + + +@pytest.mark.unit +@pytest.mark.forms +def test_form_field_population(): + """Test populating form fields with data.""" + from jsweb.forms import Form, StringField + + class UserForm(Form): + username = StringField('Username') + email = StringField('Email') + + form = UserForm() + # Manually set field data after form creation + form.username.data = 'john_doe' + form.email.data = 'john@example.com' + assert form.username.data == 'john_doe' + assert form.email.data == 'john@example.com' + + +@pytest.mark.unit +@pytest.mark.forms +def test_datarequired_validator(): + """Test DataRequired validator.""" + from jsweb.validators import DataRequired, ValidationError + + validator = DataRequired() + + class MockField: + data = None + + field = MockField() + + # Should raise for None/empty data + with pytest.raises(ValidationError): + validator(None, field) + + # Should not raise for valid data + field.data = "valid data" + validator(None, field) # Should not raise + + +@pytest.mark.unit +@pytest.mark.forms +def test_email_validator(): + """Test Email validator.""" + from jsweb.validators import Email, ValidationError + + validator = Email() + + class MockField: + def __init__(self, data): + self.data = data + + # Valid email + field = MockField('test@example.com') + validator(None, field) # Should not raise + + # Invalid email + field = MockField('not-an-email') + with pytest.raises(ValidationError): + validator(None, field) + + +@pytest.mark.unit +@pytest.mark.forms +def test_length_validator(): + """Test Length validator.""" + from jsweb.validators import Length, ValidationError + + validator = Length(min=3, max=10) + + class MockField: + def __init__(self, data): + self.data = data + + # Valid length + field = MockField('hello') + validator(None, field) # Should not raise + + # Too short + field = MockField('ab') + with pytest.raises(ValidationError): + validator(None, field) + + # Too long + field = MockField('this is way too long') + with pytest.raises(ValidationError): + validator(None, field) + + +@pytest.mark.unit +@pytest.mark.forms +def test_eql_validator(): + """Test EqualTo validator.""" + from jsweb.validators import EqualTo, ValidationError + + class MockForm: + def __getitem__(self, key): + if key == 'password': + field = type('Field', (), {'data': 'mypassword'})() + return field + raise KeyError(key) + + validator = EqualTo('password') + + class MockField: + def __init__(self, data): + self.data = data + + # Matching passwords + field = MockField('mypassword') + validator(MockForm(), field) # Should not raise + + # Non-matching passwords + field = MockField('different') + with pytest.raises(ValidationError): + validator(MockForm(), field) + + +@pytest.mark.unit +@pytest.mark.forms +def test_form_multiple_fields(): + """Test form with multiple different field types.""" + from jsweb.forms import Form, StringField, IntegerField, BooleanField + + class ProfileForm(Form): + name = StringField('Name') + age = IntegerField('Age') + active = BooleanField('Active') + + form = ProfileForm() + # Manually set field data + form.name.data = 'John Doe' + form.age.data = 30 + form.active.data = True + assert form.name.data == 'John Doe' + assert form.age.data == 30 + assert form.active.data is True + + +@pytest.mark.unit +@pytest.mark.forms +def test_form_field_rendering(): + """Test form field HTML rendering.""" + from jsweb.forms import Form, StringField + + class ContactForm(Form): + email = StringField('Email') + + form = ContactForm() + + # Should be able to render field + field_html = str(form.email) + assert 'email' in field_html.lower() or form.email is not None + + +@pytest.mark.unit +@pytest.mark.forms +def test_textarea_field(): + """Test TextAreaField.""" + from jsweb.forms import Form, TextAreaField + + class CommentForm(Form): + comment = TextAreaField('Comment') + + form = CommentForm() + form.comment.data = 'This is a comment' + assert form.comment.data == 'This is a comment' + + +@pytest.mark.unit +@pytest.mark.forms +def test_select_field(): + """Test SelectField.""" + try: + from jsweb.forms import Form, SelectField + + class CategoryForm(Form): + category = SelectField('Category', choices=[ + ('tech', 'Technology'), + ('business', 'Business'), + ('sports', 'Sports') + ]) + + form = CategoryForm() + form.category.data = 'tech' + assert form.category.data == 'tech' + except ImportError: + pytest.skip("SelectField not available") + + +@pytest.mark.unit +@pytest.mark.forms +def test_range_validator(): + """Test NumberRange validator.""" + try: + from jsweb.validators import NumberRange, ValidationError + + validator = NumberRange(min=1, max=100) + + class MockField: + def __init__(self, data): + self.data = data + + # Valid range + field = MockField(50) + validator(None, field) # Should not raise + + # Too small + field = MockField(0) + with pytest.raises(ValidationError): + validator(None, field) + + # Too large + field = MockField(101) + with pytest.raises(ValidationError): + validator(None, field) + except ImportError: + pytest.skip("NumberRange not available") + + +@pytest.mark.unit +@pytest.mark.forms +def test_regex_validator(): + """Test Regexp validator.""" + try: + from jsweb.validators import Regexp, ValidationError + + # Only alphanumeric + validator = Regexp(r'^\w+$') + + class MockField: + def __init__(self, data): + self.data = data + + # Valid + field = MockField('username123') + validator(None, field) # Should not raise + + # Invalid (contains special char) + field = MockField('user@name') + with pytest.raises(ValidationError): + validator(None, field) + except ImportError: + pytest.skip("Regexp validator not available") + + +@pytest.mark.unit +@pytest.mark.forms +def test_form_field_errors(): + """Test form field error handling.""" + from jsweb.forms import Form, StringField + from jsweb.validators import DataRequired, ValidationError + + class RequiredForm(Form): + name = StringField('Name', validators=[DataRequired()]) + + form = RequiredForm() + + # Field should have validators + assert len(form.name.validators) > 0 + + +@pytest.mark.unit +@pytest.mark.forms +def test_file_field_validators(): + """Test FileField with validators.""" + from jsweb.forms import Form, FileField + from jsweb.validators import FileRequired, FileAllowed, FileSize + + class UploadForm(Form): + document = FileField('Document', validators=[ + FileRequired(), + FileAllowed(['pdf', 'doc', 'docx']), + FileSize(max_size=5*1024*1024) # 5MB + ]) + + form = UploadForm() + assert form.document is not None + assert len(form.document.validators) == 3 + + +@pytest.mark.unit +@pytest.mark.forms +def test_hidden_field(): + """Test HiddenField.""" + try: + from jsweb.forms import Form, HiddenField + + class SecureForm(Form): + csrf_token = HiddenField() + + form = SecureForm() + form.csrf_token.data = 'token123' + assert form.csrf_token.data == 'token123' + except ImportError: + pytest.skip("HiddenField not available") + + +@pytest.mark.unit +@pytest.mark.forms +def test_password_field(): + """Test PasswordField.""" + try: + from jsweb.forms import Form, PasswordField + from jsweb.validators import DataRequired + + class LoginForm(Form): + password = PasswordField('Password', validators=[DataRequired()]) + + form = LoginForm() + assert form.password is not None + except ImportError: + pytest.skip("PasswordField not available") diff --git a/Tests/test_framework_comparison.py b/Tests/test_framework_comparison.py deleted file mode 100644 index a954f20..0000000 --- a/Tests/test_framework_comparison.py +++ /dev/null @@ -1,392 +0,0 @@ -""" -Comprehensive routing benchmark comparing JsWeb with major Python web frameworks. - -Frameworks tested: -- JsWeb (optimized) -- Starlette (used by FastAPI) -- FastAPI -- Aiohttp -- Flask -- Django - -Tests both static and dynamic routes with 50 routes each (realistic app size). -""" -import time -import sys - -# Suppress warnings -import warnings -warnings.filterwarnings("ignore") - -print("=" * 70) -print("ROUTING PERFORMANCE COMPARISON - PYTHON WEB FRAMEWORKS") -print("=" * 70) -print("\nSetting up frameworks...") - -# ============================================================================ -# 1. JSWEB -# ============================================================================ -try: - from jsweb.routing import Router as JsWebRouter - - jsweb_router = JsWebRouter() - for i in range(50): - jsweb_router.add_route(f"/static/page/{i}", lambda req: "OK", methods=["GET"], endpoint=f"jsweb_static_{i}") - jsweb_router.add_route(f"/dynamic//resource/{i}", lambda req: "OK", methods=["GET"], endpoint=f"jsweb_dynamic_{i}") - - jsweb_available = True - print("[OK] JsWeb") -except Exception as e: - jsweb_available = False - print(f"[SKIP] JsWeb: {e}") - -# ============================================================================ -# 2. STARLETTE -# ============================================================================ -try: - from starlette.routing import Route as StarletteRoute, Router as StarletteRouter - - def dummy_handler(request): - return {"message": "OK"} - - starlette_routes = [] - for i in range(50): - starlette_routes.append(StarletteRoute(f"/static/page/{i}", dummy_handler)) - starlette_routes.append(StarletteRoute(f"/dynamic/{{id:int}}/resource/{i}", dummy_handler)) - - starlette_router = StarletteRouter(routes=starlette_routes) - starlette_available = True - print("[OK] Starlette") -except Exception as e: - starlette_available = False - print(f"[SKIP] Starlette: {e}") - -# ============================================================================ -# 3. FASTAPI -# ============================================================================ -try: - from fastapi import FastAPI - - fastapi_app = FastAPI() - - for i in range(50): - # Use exec to dynamically create routes with unique function names - exec(f""" -@fastapi_app.get("/static/page/{i}") -def fastapi_static_{i}(): - return {{"message": "OK"}} - -@fastapi_app.get("/dynamic/{{id}}/resource/{i}") -def fastapi_dynamic_{i}(id: int): - return {{"message": "OK"}} -""") - - fastapi_available = True - print("[OK] FastAPI") -except Exception as e: - fastapi_available = False - print(f"[SKIP] FastAPI: {e}") - -# ============================================================================ -# 4. AIOHTTP -# ============================================================================ -try: - from aiohttp import web - - aiohttp_app = web.Application() - - async def aiohttp_handler(request): - return web.Response(text="OK") - - for i in range(50): - aiohttp_app.router.add_get(f"/static/page/{i}", aiohttp_handler) - aiohttp_app.router.add_get(f"/dynamic/{{id}}/resource/{i}", aiohttp_handler) - - aiohttp_available = True - print("[OK] Aiohttp") -except Exception as e: - aiohttp_available = False - print(f"[SKIP] Aiohttp: {e}") - -# ============================================================================ -# 5. FLASK -# ============================================================================ -try: - from flask import Flask - from werkzeug.routing import Map, Rule - - flask_app = Flask(__name__) - flask_rules = [] - - def flask_handler(): - return "OK" - - for i in range(50): - flask_rules.append(Rule(f"/static/page/{i}", endpoint=f"static_{i}")) - flask_rules.append(Rule(f"/dynamic//resource/{i}", endpoint=f"dynamic_{i}")) - - flask_map = Map(flask_rules) - flask_adapter = flask_map.bind('example.com') - - flask_available = True - print("[OK] Flask") -except Exception as e: - flask_available = False - print(f"[SKIP] Flask: {e}") - -# ============================================================================ -# 6. DJANGO -# ============================================================================ -try: - import os - import django - from django.conf import settings - - if not settings.configured: - settings.configure( - DEBUG=False, - SECRET_KEY='test-secret-key', - ROOT_URLCONF=__name__, - ALLOWED_HOSTS=['*'], - ) - django.setup() - - from django.urls import path - from django.http import HttpResponse - - def django_handler(request): - return HttpResponse("OK") - - urlpatterns = [] - for i in range(50): - urlpatterns.append(path(f"static/page/{i}", django_handler, name=f"django_static_{i}")) - urlpatterns.append(path(f"dynamic//resource/{i}", django_handler, name=f"django_dynamic_{i}")) - - from django.urls import resolve - django_available = True - print("[OK] Django") -except Exception as e: - django_available = False - print(f"[SKIP] Django: {e}") - -# ============================================================================ -# BENCHMARK FUNCTIONS -# ============================================================================ - -def benchmark_jsweb(): - """Benchmark JsWeb routing.""" - # Static route - start = time.perf_counter() - for _ in range(100000): - handler, params = jsweb_router.resolve("/static/page/25", "GET") - static_time = (time.perf_counter() - start) * 1000 - - # Dynamic route - start = time.perf_counter() - for _ in range(100000): - handler, params = jsweb_router.resolve("/dynamic/123/resource/25", "GET") - dynamic_time = (time.perf_counter() - start) * 1000 - - return static_time, dynamic_time - -def benchmark_starlette(): - """Benchmark Starlette routing.""" - from starlette.requests import Request - - # Static route - start = time.perf_counter() - for _ in range(100000): - scope = {"type": "http", "method": "GET", "path": "/static/page/25"} - for route in starlette_router.routes: - match, child_scope = route.matches(scope) - if match: - break - static_time = (time.perf_counter() - start) * 1000 - - # Dynamic route - start = time.perf_counter() - for _ in range(100000): - scope = {"type": "http", "method": "GET", "path": "/dynamic/123/resource/25"} - for route in starlette_router.routes: - match, child_scope = route.matches(scope) - if match: - break - dynamic_time = (time.perf_counter() - start) * 1000 - - return static_time, dynamic_time - -def benchmark_fastapi(): - """Benchmark FastAPI routing.""" - # FastAPI uses Starlette internally, so similar performance - # We'll test the route resolution through FastAPI's router - - # Static route - start = time.perf_counter() - for _ in range(100000): - for route in fastapi_app.routes: - if route.path == "/static/page/25": - break - static_time = (time.perf_counter() - start) * 1000 - - # Dynamic route - start = time.perf_counter() - for _ in range(100000): - scope = {"type": "http", "method": "GET", "path": "/dynamic/123/resource/25"} - for route in fastapi_app.routes: - match, child_scope = route.matches(scope) - if match: - break - dynamic_time = (time.perf_counter() - start) * 1000 - - return static_time, dynamic_time - -def benchmark_aiohttp(): - """Benchmark Aiohttp routing.""" - # Aiohttp resource resolution - - # Static route - start = time.perf_counter() - for _ in range(100000): - resource = aiohttp_app.router._resources[50] # Static route #25 - static_time = (time.perf_counter() - start) * 1000 - - # Dynamic route - need to match - start = time.perf_counter() - for _ in range(100000): - for resource in aiohttp_app.router._resources: - match_dict = resource.get_info().get('pattern', None) - if match_dict: - break - dynamic_time = (time.perf_counter() - start) * 1000 - - return static_time, dynamic_time - -def benchmark_flask(): - """Benchmark Flask routing.""" - # Static route - start = time.perf_counter() - for _ in range(100000): - endpoint, values = flask_adapter.match("/static/page/25") - static_time = (time.perf_counter() - start) * 1000 - - # Dynamic route - start = time.perf_counter() - for _ in range(100000): - endpoint, values = flask_adapter.match("/dynamic/123/resource/25") - dynamic_time = (time.perf_counter() - start) * 1000 - - return static_time, dynamic_time - -def benchmark_django(): - """Benchmark Django routing.""" - # Static route - start = time.perf_counter() - for _ in range(100000): - match = resolve("/static/page/25") - static_time = (time.perf_counter() - start) * 1000 - - # Dynamic route - start = time.perf_counter() - for _ in range(100000): - match = resolve("/dynamic/123/resource/25") - dynamic_time = (time.perf_counter() - start) * 1000 - - return static_time, dynamic_time - -# ============================================================================ -# RUN BENCHMARKS -# ============================================================================ - -print("\n" + "=" * 70) -print("RUNNING BENCHMARKS (100,000 requests each)") -print("=" * 70) - -results = {} - -if jsweb_available: - print("\nBenchmarking JsWeb...") - static, dynamic = benchmark_jsweb() - results['JsWeb'] = (static, dynamic) - print(f" Static: {static:.2f}ms ({static/100:.4f}μs per request)") - print(f" Dynamic: {dynamic:.2f}ms ({dynamic/100:.4f}μs per request)") - -if starlette_available: - print("\nBenchmarking Starlette...") - static, dynamic = benchmark_starlette() - results['Starlette'] = (static, dynamic) - print(f" Static: {static:.2f}ms ({static/100:.4f}μs per request)") - print(f" Dynamic: {dynamic:.2f}ms ({dynamic/100:.4f}μs per request)") - -if fastapi_available: - print("\nBenchmarking FastAPI...") - static, dynamic = benchmark_fastapi() - results['FastAPI'] = (static, dynamic) - print(f" Static: {static:.2f}ms ({static/100:.4f}μs per request)") - print(f" Dynamic: {dynamic:.2f}ms ({dynamic/100:.4f}μs per request)") - -if aiohttp_available: - print("\nBenchmarking Aiohttp...") - static, dynamic = benchmark_aiohttp() - results['Aiohttp'] = (static, dynamic) - print(f" Static: {static:.2f}ms ({static/100:.4f}μs per request)") - print(f" Dynamic: {dynamic:.2f}ms ({dynamic/100:.4f}μs per request)") - -if flask_available: - print("\nBenchmarking Flask...") - static, dynamic = benchmark_flask() - results['Flask'] = (static, dynamic) - print(f" Static: {static:.2f}ms ({static/100:.4f}μs per request)") - print(f" Dynamic: {dynamic:.2f}ms ({dynamic/100:.4f}μs per request)") - -if django_available: - print("\nBenchmarking Django...") - static, dynamic = benchmark_django() - results['Django'] = (static, dynamic) - print(f" Static: {static:.2f}ms ({static/100:.4f}μs per request)") - print(f" Dynamic: {dynamic:.2f}ms ({dynamic/100:.4f}μs per request)") - -# ============================================================================ -# COMPARISON TABLE -# ============================================================================ - -if results: - print("\n" + "=" * 70) - print("COMPARISON (50 routes each)") - print("=" * 70) - - # Find JsWeb baseline - if 'JsWeb' in results: - jsweb_static, jsweb_dynamic = results['JsWeb'] - - print(f"\n{'Framework':<15} {'Static (μs)':<15} {'vs JsWeb':<12} {'Dynamic (μs)':<15} {'vs JsWeb':<12}") - print("-" * 70) - - for name, (static, dynamic) in sorted(results.items()): - static_us = static / 100 - dynamic_us = dynamic / 100 - - if name == 'JsWeb': - static_ratio = "baseline" - dynamic_ratio = "baseline" - else: - static_ratio = f"{static_us / (jsweb_static/100):.2f}x slower" if static_us > jsweb_static/100 else f"{(jsweb_static/100) / static_us:.2f}x faster" - dynamic_ratio = f"{dynamic_us / (jsweb_dynamic/100):.2f}x slower" if dynamic_us > jsweb_dynamic/100 else f"{(jsweb_dynamic/100) / dynamic_us:.2f}x faster" - - print(f"{name:<15} {static_us:<15.4f} {static_ratio:<12} {dynamic_us:<15.4f} {dynamic_ratio:<12}") - - print("\n" + "=" * 70) - print("WINNER: ", end="") - - # Find fastest for static - fastest_static = min(results.items(), key=lambda x: x[1][0]) - fastest_dynamic = min(results.items(), key=lambda x: x[1][1]) - - if fastest_static[0] == fastest_dynamic[0]: - print(f"{fastest_static[0]} (fastest for both static and dynamic routes)") - else: - print(f"{fastest_static[0]} (static), {fastest_dynamic[0]} (dynamic)") - - print("=" * 70) - -else: - print("\n⚠️ No frameworks available for benchmarking!") \ No newline at end of file diff --git a/Tests/test_middleware.py b/Tests/test_middleware.py new file mode 100644 index 0000000..b232741 --- /dev/null +++ b/Tests/test_middleware.py @@ -0,0 +1,349 @@ +"""Tests for JsWeb middleware and request processing.""" + +import pytest + + +@pytest.mark.unit +def test_middleware_basic(): + """Test basic middleware structure.""" + class SimpleMiddleware: + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + # Add something to environ + environ['middleware_executed'] = True + return self.app(environ, start_response) + + def dummy_app(environ, start_response): + return [] + + middleware = SimpleMiddleware(dummy_app) + assert middleware is not None + assert middleware.app == dummy_app + + +@pytest.mark.unit +def test_middleware_chain(): + """Test middleware chain execution.""" + class Middleware: + def __init__(self, app, name): + self.app = app + self.name = name + self.executed = False + + def __call__(self, environ, start_response): + self.executed = True + return self.app(environ, start_response) + + def base_app(environ, start_response): + return [] + + m1 = Middleware(base_app, "first") + m2 = Middleware(m1, "second") + + environ = {} + m2(environ, lambda s, h: None) + + assert m1.executed + assert m2.executed + + +@pytest.mark.unit +def test_cors_middleware(): + """Test CORS middleware.""" + try: + from jsweb.middleware import CORSMiddleware + + cors = CORSMiddleware(allow_origins=["*"]) + assert cors is not None + except ImportError: + # Basic CORS implementation test + class CORSMiddleware: + def __init__(self, allow_origins=None): + self.allow_origins = allow_origins or [] + + cors = CORSMiddleware(allow_origins=["*"]) + assert cors is not None + + +@pytest.mark.unit +def test_gzip_middleware(): + """Test GZIP compression middleware.""" + try: + from jsweb.middleware import GZipMiddleware + + gzip = GZipMiddleware() + assert gzip is not None + except ImportError: + # Basic GZIP middleware test + class GZipMiddleware: + def __init__(self, min_size=500): + self.min_size = min_size + + gzip = GZipMiddleware() + assert gzip.min_size == 500 + + +@pytest.mark.unit +def test_request_logging_middleware(): + """Test request logging middleware.""" + class RequestLoggingMiddleware: + def __init__(self, app): + self.app = app + self.requests = [] + + def __call__(self, environ, start_response): + self.requests.append({ + 'method': environ.get('REQUEST_METHOD'), + 'path': environ.get('PATH_INFO') + }) + return self.app(environ, start_response) + + def dummy_app(environ, start_response): + return [] + + middleware = RequestLoggingMiddleware(dummy_app) + + environ = {'REQUEST_METHOD': 'GET', 'PATH_INFO': '/test'} + middleware(environ, lambda s, h: None) + + assert len(middleware.requests) == 1 + assert middleware.requests[0]['method'] == 'GET' + assert middleware.requests[0]['path'] == '/test' + + +@pytest.mark.unit +def test_authentication_middleware(): + """Test authentication middleware.""" + class AuthMiddleware: + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + auth_header = environ.get('HTTP_AUTHORIZATION', '') + if not auth_header.startswith('Bearer '): + start_response('401 Unauthorized', []) + return [b'Unauthorized'] + + environ['user_authenticated'] = True + return self.app(environ, start_response) + + def dummy_app(environ, start_response): + return [b'OK'] + + middleware = AuthMiddleware(dummy_app) + + # Without auth header + environ = {} + result = middleware(environ, lambda s, h: None) + assert result == [b'Unauthorized'] + + # With auth header + environ = {'HTTP_AUTHORIZATION': 'Bearer token123'} + result = middleware(environ, lambda s, h: None) + assert environ['user_authenticated'] is True + + +@pytest.mark.unit +def test_security_headers_middleware(): + """Test security headers middleware.""" + class SecurityHeadersMiddleware: + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + def custom_start_response(status, headers): + # Add security headers + security_headers = [ + ('X-Content-Type-Options', 'nosniff'), + ('X-Frame-Options', 'DENY'), + ('X-XSS-Protection', '1; mode=block'), + ] + headers.extend(security_headers) + return start_response(status, headers) + + return self.app(environ, custom_start_response) + + def dummy_app(environ, start_response): + return [] + + middleware = SecurityHeadersMiddleware(dummy_app) + assert middleware is not None + + +@pytest.mark.unit +def test_error_handling_middleware(): + """Test error handling middleware.""" + class ErrorHandlerMiddleware: + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + try: + return self.app(environ, start_response) + except Exception as e: + start_response('500 Internal Server Error', [('Content-Type', 'text/plain')]) + return [str(e).encode()] + + def failing_app(environ, start_response): + raise ValueError("Test error") + + middleware = ErrorHandlerMiddleware(failing_app) + + result = middleware({}, lambda s, h: None) + assert b'Test error' in result[0] + + +@pytest.mark.unit +def test_session_middleware(): + """Test session middleware.""" + class SessionMiddleware: + def __init__(self, app): + self.app = app + self.sessions = {} + + def __call__(self, environ, start_response): + # Get or create session + session_id = environ.get('HTTP_COOKIE', '').split('session=')[-1] + if not session_id or session_id not in self.sessions: + session_id = 'new_session_123' + self.sessions[session_id] = {} + + environ['session'] = self.sessions[session_id] + environ['session_id'] = session_id + + return self.app(environ, start_response) + + def dummy_app(environ, start_response): + return [] + + middleware = SessionMiddleware(dummy_app) + + environ = {} + middleware(environ, lambda s, h: None) + + assert 'session' in environ + assert 'session_id' in environ + + +@pytest.mark.unit +def test_content_type_middleware(): + """Test content type handling middleware.""" + class ContentTypeMiddleware: + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + content_type = environ.get('CONTENT_TYPE', '') + if 'application/json' in content_type: + environ['is_json'] = True + + return self.app(environ, start_response) + + def dummy_app(environ, start_response): + return [] + + middleware = ContentTypeMiddleware(dummy_app) + + environ = {'CONTENT_TYPE': 'application/json'} + middleware(environ, lambda s, h: None) + + assert environ.get('is_json') is True + + +@pytest.mark.unit +def test_rate_limiting_middleware(): + """Test rate limiting middleware.""" + class RateLimitMiddleware: + def __init__(self, app, requests_per_minute=60): + self.app = app + self.requests_per_minute = requests_per_minute + self.request_counts = {} + + def __call__(self, environ, start_response): + client_ip = environ.get('REMOTE_ADDR', 'unknown') + current_count = self.request_counts.get(client_ip, 0) + + if current_count >= self.requests_per_minute: + start_response('429 Too Many Requests', []) + return [b'Rate limit exceeded'] + + self.request_counts[client_ip] = current_count + 1 + return self.app(environ, start_response) + + def dummy_app(environ, start_response): + return [b'OK'] + + middleware = RateLimitMiddleware(dummy_app, requests_per_minute=3) + + environ = {'REMOTE_ADDR': '192.168.1.1'} + + # First 3 requests should succeed + for i in range(3): + result = middleware(environ, lambda s, h: None) + assert result == [b'OK'] + + # 4th request should be rate limited + result = middleware(environ, lambda s, h: None) + assert result == [b'Rate limit exceeded'] + + +@pytest.mark.unit +def test_request_id_middleware(): + """Test request ID tracking middleware.""" + import uuid + + class RequestIDMiddleware: + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + request_id = str(uuid.uuid4()) + environ['request_id'] = request_id + + def custom_start_response(status, headers): + headers.append(('X-Request-ID', request_id)) + return start_response(status, headers) + + return self.app(environ, custom_start_response) + + def dummy_app(environ, start_response): + return [] + + middleware = RequestIDMiddleware(dummy_app) + + environ = {} + middleware(environ, lambda s, h: None) + + assert 'request_id' in environ + assert isinstance(environ['request_id'], str) + + +@pytest.mark.unit +def test_method_override_middleware(): + """Test HTTP method override middleware.""" + class MethodOverrideMiddleware: + def __init__(self, app): + self.app = app + + def __call__(self, environ, start_response): + # Allow overriding method via header + override = environ.get('HTTP_X_HTTP_METHOD_OVERRIDE') + if override: + environ['REQUEST_METHOD'] = override + + return self.app(environ, start_response) + + def dummy_app(environ, start_response): + return [] + + middleware = MethodOverrideMiddleware(dummy_app) + + environ = { + 'REQUEST_METHOD': 'POST', + 'HTTP_X_HTTP_METHOD_OVERRIDE': 'DELETE' + } + + middleware(environ, lambda s, h: None) + assert environ['REQUEST_METHOD'] == 'DELETE' diff --git a/Tests/test_new_features.py b/Tests/test_new_features.py deleted file mode 100644 index fb8ef8a..0000000 --- a/Tests/test_new_features.py +++ /dev/null @@ -1,128 +0,0 @@ -#!/usr/bin/env python -"""Test script for new JSON and file upload features.""" - -import json -from io import BytesIO - -print("=" * 60) -print("Testing New JsWeb Features") -print("=" * 60) - -# Test 1: Import all new features -print("\n[1] Testing imports...") -try: - from jsweb import UploadedFile, FileField, FileRequired, FileAllowed, FileSize - print(" [PASS] All new features imported successfully") -except Exception as e: - print(f" [FAIL] Import error: {e}") - exit(1) - -# Test 2: JSON parsing -print("\n[2] Testing JSON request body parsing...") -try: - from jsweb.request import Request - - class FakeApp: - class config: - pass - - body = json.dumps({'name': 'Alice', 'email': 'alice@example.com'}) - content = body.encode('utf-8') - - app = FakeApp() - environ = { - 'REQUEST_METHOD': 'POST', - 'CONTENT_TYPE': 'application/json', - 'CONTENT_LENGTH': str(len(content)), - 'PATH_INFO': '/', - 'QUERY_STRING': '', - 'HTTP_COOKIE': '', - 'wsgi.input': BytesIO(content) - } - - req = Request(environ, app) - data = req.json - - assert data == {'name': 'Alice', 'email': 'alice@example.com'}, "JSON data mismatch" - print(f" [PASS] JSON parsed correctly: {data}") -except Exception as e: - print(f" [FAIL] JSON parsing error: {e}") - import traceback - traceback.print_exc() - -# Test 3: FileField in forms -print("\n[3] Testing FileField...") -try: - from jsweb.forms import Form, FileField - from jsweb.validators import FileRequired, FileAllowed, FileSize - - class TestForm(Form): - upload = FileField('Upload File', validators=[ - FileRequired(), - FileAllowed(['jpg', 'png']), - FileSize(max_size=1024*1024) # 1MB - ]) - - form = TestForm() - print(" [PASS] FileField created successfully") - print(f" Validators: {[v.__class__.__name__ for v in form.upload.validators]}") -except Exception as e: - print(f" [FAIL] FileField error: {e}") - import traceback - traceback.print_exc() - -# Test 4: File validators -print("\n[4] Testing file validators...") -try: - from jsweb.validators import FileAllowed, FileSize, ValidationError - - # Test FileAllowed - class MockField: - def __init__(self, filename): - self.data = type('obj', (object,), {'filename': filename})() - - validator = FileAllowed(['jpg', 'png']) - field = MockField('test.jpg') - - try: - validator(None, field) - print(" [PASS] FileAllowed: .jpg accepted") - except ValidationError: - print(" [FAIL] FileAllowed: .jpg should be accepted") - - field = MockField('test.exe') - try: - validator(None, field) - print(" [FAIL] FileAllowed: .exe should be rejected") - except ValidationError as e: - print(f" [PASS] FileAllowed: .exe rejected - {e}") - - # Test FileSize - class MockFieldWithSize: - def __init__(self, size): - self.data = type('obj', (object,), {'size': size})() - - validator = FileSize(max_size=1000) - field = MockFieldWithSize(500) - - try: - validator(None, field) - print(" [PASS] FileSize: 500 bytes accepted (max 1000)") - except ValidationError: - print(" [FAIL] FileSize: 500 bytes should be accepted") - - field = MockFieldWithSize(2000) - try: - validator(None, field) - print(" [FAIL] FileSize: 2000 bytes should be rejected") - except ValidationError as e: - print(f" [PASS] FileSize: 2000 bytes rejected") - -except Exception as e: - print(f" [FAIL] Validator error: {e}") - import traceback - traceback.print_exc() - -print("\n" + "=" * 60) -print("All tests completed!") -print("=" * 60) \ No newline at end of file diff --git a/Tests/test_optimized_routing.py b/Tests/test_optimized_routing.py deleted file mode 100644 index d649cec..0000000 --- a/Tests/test_optimized_routing.py +++ /dev/null @@ -1,32 +0,0 @@ -import time -from jsweb.routing import Router - -def benchmark(): - router = Router() - - #Add 40 static routes - for i in range(40): - router.add_route(f"/pages/{i}",lambda req: "OK", methods=["GET"], endpoint=f"page_{i}") - - #Add 10 dynamic routes - for i in range(10): - router.add_route(f"/users//post/", lambda req: "OK", endpoint=f"user_post_{i}") - - #Benchmark resolving static routes - start = time.perf_counter() - for _ in range(100000): - handler, params = router.resolve("/pages/25", "GET") - static_ms = (time.perf_counter() - start) * 1000 - - #Benchmark resolving dynamic routes - start = time.perf_counter() - for _ in range(100000): - handler, params = router.resolve("/users/123/post/456", "GET") - dynamic_ms = (time.perf_counter() - start) * 1000 - - print(f"Statics: {static_ms:.2f} ms (100k requests) = {static_ms/100:.4f}ms avg") - print(f"Dynamics: {dynamic_ms:.2f} ms (100k requests) = {dynamic_ms/100:.4f}ms avg") - print(f"\nPerformance: ~{100 - (static_ms/250)*100:.0f}% improvement for static routes") - -if __name__ == "__main__": - benchmark() diff --git a/Tests/test_performance.py b/Tests/test_performance.py new file mode 100644 index 0000000..661fc08 --- /dev/null +++ b/Tests/test_performance.py @@ -0,0 +1,241 @@ +"""Framework comparison and performance benchmarking tests.""" + +import pytest +import time + + +@pytest.mark.slow +@pytest.mark.integration +def test_jsweb_routing_performance(): + """Benchmark JsWeb routing performance.""" + from jsweb.routing import Router + + router = Router() + + # Add 50 static routes + for i in range(50): + router.add_route(f"/static/page/{i}", lambda req: "OK", methods=["GET"], + endpoint=f"static_{i}") + + # Add 50 dynamic routes + for i in range(50): + router.add_route(f"/dynamic//resource/{i}", lambda req: "OK", + endpoint=f"dynamic_{i}") + + # Benchmark static route resolution + start = time.perf_counter() + for _ in range(10000): + router.resolve("/static/page/25", "GET") + static_time = (time.perf_counter() - start) * 1000 + + # Benchmark dynamic route resolution + start = time.perf_counter() + for _ in range(10000): + router.resolve("/dynamic/123/resource/25", "GET") + dynamic_time = (time.perf_counter() - start) * 1000 + + # Assertions - JsWeb should be reasonably fast + # Static route resolution should be < 500ms for 10k requests (~50μs per request) + assert static_time < 500, f"Static routing too slow: {static_time}ms for 10k requests" + + # Dynamic route resolution should be < 1000ms for 10k requests (~100μs per request) + assert dynamic_time < 1000, f"Dynamic routing too slow: {dynamic_time}ms for 10k requests" + + +@pytest.mark.unit +def test_jsweb_routing_accuracy_with_dynamic_routes(): + """Test that JsWeb routing correctly extracts dynamic parameters.""" + from jsweb.routing import Router + + router = Router() + + def handler(req): + return "OK" + + router.add_route("/users//posts/", handler, + endpoint="user_post") + + # Test with various parameter values + test_cases = [ + ("/users/1/posts/1", {'user_id': 1, 'post_id': 1}), + ("/users/999/posts/555", {'user_id': 999, 'post_id': 555}), + ("/users/0/posts/0", {'user_id': 0, 'post_id': 0}), + ] + + for path, expected_params in test_cases: + resolved_handler, params = router.resolve(path, "GET") + assert resolved_handler == handler, f"Handler mismatch for {path}" + assert params == expected_params, f"Parameters mismatch for {path}: got {params}, expected {expected_params}" + + +@pytest.mark.integration +@pytest.mark.slow +def test_starlette_routing_performance(): + """Benchmark Starlette routing performance (if available).""" + try: + from starlette.routing import Route, Router as StarletteRouter + except ImportError: + pytest.skip("Starlette not installed") + + def dummy_handler(request): + return {"message": "OK"} + + routes = [] + for i in range(50): + routes.append(Route(f"/static/page/{i}", dummy_handler)) + routes.append(Route(f"/dynamic/{{id:int}}/resource/{i}", dummy_handler)) + + router = StarletteRouter(routes=routes) + + # Benchmark static route + start = time.perf_counter() + for _ in range(1000): + scope = {"type": "http", "method": "GET", "path": "/static/page/25"} + for route in router.routes: + match, child_scope = route.matches(scope) + if match: + break + static_time = (time.perf_counter() - start) * 1000 + + # Benchmark dynamic route + start = time.perf_counter() + for _ in range(1000): + scope = {"type": "http", "method": "GET", "path": "/dynamic/123/resource/25"} + for route in router.routes: + match, child_scope = route.matches(scope) + if match: + break + dynamic_time = (time.perf_counter() - start) * 1000 + + # Starlette should handle 1000 requests in reasonable time + assert static_time < 100, f"Starlette static routing too slow: {static_time}ms" + assert dynamic_time < 100, f"Starlette dynamic routing too slow: {dynamic_time}ms" + + +@pytest.mark.integration +@pytest.mark.slow +def test_flask_routing_performance(): + """Benchmark Flask routing performance (if available).""" + try: + from flask import Flask + from werkzeug.routing import Map, Rule + except ImportError: + pytest.skip("Flask not installed") + + rules = [] + for i in range(50): + rules.append(Rule(f"/static/page/{i}", endpoint=f"static_{i}")) + rules.append(Rule(f"/dynamic//resource/{i}", endpoint=f"dynamic_{i}")) + + url_map = Map(rules) + adapter = url_map.bind('example.com') + + # Benchmark static route + start = time.perf_counter() + for _ in range(10000): + adapter.match("/static/page/25") + static_time = (time.perf_counter() - start) * 1000 + + # Benchmark dynamic route + start = time.perf_counter() + for _ in range(10000): + adapter.match("/dynamic/123/resource/25") + dynamic_time = (time.perf_counter() - start) * 1000 + + # Flask should handle requests reasonably fast + assert static_time < 50, f"Flask static routing too slow: {static_time}ms" + assert dynamic_time < 100, f"Flask dynamic routing too slow: {dynamic_time}ms" + + +@pytest.mark.unit +def test_routing_comparison_jsweb_vs_alternatives(): + """Test and compare JsWeb routing against simple alternatives.""" + from jsweb.routing import Router + import re + + # JsWeb router + jsweb_router = Router() + + def handler(req): + return "OK" + + jsweb_router.add_route("/users/", handler, endpoint="jsweb_user") + + # Simple regex-based router for comparison + class SimpleRouter: + def __init__(self): + self.patterns = [] + + def add_route(self, path, handler): + # Convert Flask-style path to regex + regex_path = "^" + re.sub(r'', lambda m: f'(?P<{m.group(1)}>\\d+)', path) + "$" + self.patterns.append((re.compile(regex_path), handler)) + + def resolve(self, path): + for pattern, handler in self.patterns: + match = pattern.match(path) + if match: + return handler, match.groupdict() + return None, None + + simple_router = SimpleRouter() + simple_router.add_route("/users/", handler) + + # Both should resolve the same path correctly + jsweb_handler, jsweb_params = jsweb_router.resolve("/users/42", "GET") + simple_handler, simple_params = simple_router.resolve("/users/42") + + assert jsweb_handler == handler + assert jsweb_params == {'user_id': 42} + assert simple_handler == handler + assert simple_params == {'user_id': '42'} # Regex captures as string + + +@pytest.mark.unit +def test_routing_with_multiple_parameter_types(): + """Test routing with different parameter types.""" + from jsweb.routing import Router + + router = Router() + + def handler(req): + return "OK" + + # String parameter + router.add_route("/profile/", handler, endpoint="profile") + handler_result, params = router.resolve("/profile/john_doe", "GET") + assert params == {'username': 'john_doe'} + + # Integer parameter + router.add_route("/posts/", handler, endpoint="post") + handler_result, params = router.resolve("/posts/123", "GET") + assert params == {'post_id': 123} + + # Path parameter (catch-all) + router.add_route("/files/", handler, endpoint="file") + handler_result, params = router.resolve("/files/docs/readme.md", "GET") + assert params.get('filepath') == 'docs/readme.md' + + +@pytest.mark.slow +def test_router_with_many_routes(): + """Test router performance with a large number of routes.""" + from jsweb.routing import Router + + router = Router() + + def handler(req): + return "OK" + + # Add 500 routes + for i in range(500): + router.add_route(f"/api/endpoint_{i}", handler, endpoint=f"endpoint_{i}") + + # Should still resolve quickly + start = time.perf_counter() + for _ in range(1000): + router.resolve("/api/endpoint_250", "GET") + elapsed = (time.perf_counter() - start) * 1000 + + # Resolution should still be fast with many routes + assert elapsed < 10, f"Too slow with 500 routes: {elapsed}ms for 1000 requests" diff --git a/Tests/test_request_response.py b/Tests/test_request_response.py new file mode 100644 index 0000000..853aac9 --- /dev/null +++ b/Tests/test_request_response.py @@ -0,0 +1,426 @@ +"""Tests for JsWeb request and response handling.""" + +import pytest +import json +from io import BytesIO + + +@pytest.mark.unit +def test_request_creation(): + """Test basic request creation.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + # Request takes (scope, receive, app) + scope = {'method': 'GET', 'path': '/test', 'query_string': b'', 'headers': []} + receive = lambda: {'body': b'', 'more_body': False} + + request = Request(scope, receive, app) + assert request is not None + assert request.method == 'GET' + assert request.path == '/test' + + +@pytest.mark.unit +def test_request_method(): + """Test request method property.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + receive = lambda: {'body': b'', 'more_body': False} + + for method in ['GET', 'POST', 'PUT', 'DELETE', 'PATCH']: + scope = {'method': method, 'path': '/', 'query_string': b'', 'headers': []} + request = Request(scope, receive, app) + assert request.method == method + + +@pytest.mark.unit +def test_request_path(): + """Test request path property.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + receive = lambda: {'body': b'', 'more_body': False} + + test_paths = ['/home', '/users/123', '/api/v1/data'] + + for path in test_paths: + scope = {'method': 'GET', 'path': path, 'query_string': b'', 'headers': []} + request = Request(scope, receive, app) + assert request.path == path + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_request_json_parsing(): + """Test JSON request body parsing.""" + from jsweb.request import Request + import json + + class FakeApp: + class config: + pass + + body = json.dumps({'key': 'value', 'number': 42}) + content = body.encode('utf-8') + + app = FakeApp() + scope = { + 'type': 'http', + 'method': 'POST', + 'path': '/', + 'query_string': b'', + 'headers': [(b'content-type', b'application/json')], + } + + async def receive(): + return {'body': content, 'more_body': False} + + request = Request(scope, receive, app) + data = await request.json() + + assert data is not None + assert data['key'] == 'value' + assert data['number'] == 42 + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_request_form_parsing(): + """Test form data parsing.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + scope = { + 'type': 'http', + 'method': 'POST', + 'path': '/', + 'query_string': b'', + 'headers': [(b'content-type', b'application/x-www-form-urlencoded')], + } + + async def receive(): + return {'body': b'username=testuser&password=pass123', 'more_body': False} + + request = Request(scope, receive, app) + form = await request.form() + + assert form is not None + # Form should be a dict-like object + assert len(form) >= 0 + + +@pytest.mark.unit +def test_request_query_string(fake_environ): + """Test query string parsing.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + scope = fake_environ(query_string='name=john&age=30') + receive = lambda: {'body': b'', 'more_body': False} + request = Request(scope, receive, app) + args = request.query_params if hasattr(request, 'query_params') else {} + + assert args is not None + + +@pytest.mark.unit +def test_request_headers(fake_environ): + """Test request headers access.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + scope = fake_environ() + receive = lambda: {'body': b'', 'more_body': False} + request = Request(scope, receive, app) + + # Should be able to access headers + assert request is not None + assert hasattr(request, 'headers') or hasattr(request, 'environ') + + +@pytest.mark.unit +def test_request_content_type(fake_environ): + """Test content type detection.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + + # JSON content type + scope = fake_environ(content_type='application/json') + receive = lambda: {'body': b'', 'more_body': False} + request = Request(scope, receive, app) + assert request is not None + + # Form content type + scope2 = fake_environ(content_type='application/x-www-form-urlencoded') + request = Request(scope2, receive, app) + assert request is not None + + +@pytest.mark.unit +def test_request_cookies(fake_environ): + """Test cookie handling.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + scope = fake_environ(cookies='session=abc123; user=john') + receive = lambda: {'body': b'', 'more_body': False} + request = Request(scope, receive, app) + + assert request is not None + + +@pytest.mark.unit +def test_response_creation(): + """Test basic response creation.""" + from jsweb.response import Response + + response = Response('Hello, World!') + assert response is not None + assert 'Hello' in str(response) or response is not None + + +@pytest.mark.unit +def test_response_status_code(): + """Test response with custom status code.""" + try: + from jsweb.response import Response + + response = Response('Not Found', status=404) + assert response is not None + except TypeError: + # If Response doesn't support status parameter + response = Response('Not Found') + assert response is not None + + +@pytest.mark.unit +def test_response_json(): + """Test JSON response.""" + try: + from jsweb.response import JSONResponse + + data = {'message': 'success', 'code': 200} + response = JSONResponse(data) + assert response is not None + except (ImportError, AttributeError): + # Try alternative + from jsweb.response import Response + import json + + data = {'message': 'success', 'code': 200} + json_str = json.dumps(data) + response = Response(json_str) + assert response is not None + + +@pytest.mark.unit +def test_response_headers(): + """Test response headers.""" + from jsweb.response import Response + + response = Response('Hello') + assert response is not None + + +@pytest.mark.unit +def test_request_empty_body(fake_environ): + """Test request with empty body.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + scope = fake_environ(method='GET', content_length=0) + receive = lambda: {'body': b'', 'more_body': False} + request = Request(scope, receive, app) + + assert request is not None + assert request.method == 'GET' + + +@pytest.mark.unit +def test_request_large_body(fake_environ): + """Test request with larger body.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + large_body = b'x' * 10000 + scope = fake_environ( + method='POST', + content_length=len(large_body), + body=large_body + ) + receive = lambda: {'body': large_body, 'more_body': False} + request = Request(scope, receive, app) + + assert request is not None + + +@pytest.mark.unit +def test_request_multiple_query_params(fake_environ): + """Test parsing multiple query parameters.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + scope = fake_environ(query_string='page=1&limit=20&sort=name&filter=active') + receive = lambda: {'body': b'', 'more_body': False} + request = Request(scope, receive, app) + + assert request is not None + + +@pytest.mark.unit +def test_response_content_type(): + """Test response content type.""" + from jsweb.response import Response + + response = Response('Hello') + assert response is not None + + +@pytest.mark.unit +def test_request_method_upper(fake_environ): + """Test that request method is always uppercase.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + scope = fake_environ(method='get') + receive = lambda: {'body': b'', 'more_body': False} + request = Request(scope, receive, app) + + # Method should be uppercase + assert request.method == 'GET' or request.method == 'get' + + +@pytest.mark.unit +def test_json_response_content_type(): + """Test that JSON responses have correct content type.""" + try: + from jsweb.response import JSONResponse + + response = JSONResponse({'status': 'ok'}) + assert response is not None + except ImportError: + pytest.skip("JSONResponse not available") + + +@pytest.mark.unit +def test_request_body_multiple_reads(fake_environ): + """Test reading request body multiple times.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + body = b'test data' + scope = fake_environ(content_length=len(body), body=body) + receive = lambda: {'body': body, 'more_body': False} + request = Request(scope, receive, app) + + assert request is not None + + +@pytest.mark.unit +def test_response_string_conversion(): + """Test response string representation.""" + from jsweb.response import Response + + response = Response('Test content') + response_str = str(response) + + assert response is not None + + +@pytest.mark.unit +def test_empty_json_request(fake_environ): + """Test parsing empty JSON request.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + scope = fake_environ( + method='POST', + content_type='application/json' + ) + receive = lambda: {'body': b'{}', 'more_body': False} + request = Request(scope, receive, app) + data = request.json() + + assert data is not None + + +@pytest.mark.unit +def test_nested_json_parsing(fake_environ): + """Test parsing nested JSON structures.""" + from jsweb.request import Request + + class FakeApp: + class config: + pass + + app = FakeApp() + nested_data = {'user': {'name': 'John', 'address': {'city': 'NYC'}}} + body = json.dumps(nested_data).encode('utf-8') + scope = fake_environ( + method='POST', + content_type='application/json' + ) + receive = lambda: {'body': body, 'more_body': False} + request = Request(scope, receive, app) + data = request.json() + + assert data is not None diff --git a/Tests/test_routing.py b/Tests/test_routing.py new file mode 100644 index 0000000..caf0e12 --- /dev/null +++ b/Tests/test_routing.py @@ -0,0 +1,256 @@ +"""Tests for jsweb routing system.""" + +import pytest +from jsweb.routing import Router + + +@pytest.mark.unit +def test_router_creation(): + """Test basic router creation.""" + router = Router() + assert router is not None + assert hasattr(router, 'add_route') + assert hasattr(router, 'resolve') + + +@pytest.mark.unit +def test_add_static_route(): + """Test adding a static route.""" + router = Router() + + def handler(req): + return "OK" + + router.add_route("/test", handler, methods=["GET"], endpoint="test_endpoint") + + # Verify route was added + handler_result, params = router.resolve("/test", "GET") + assert handler_result is not None + assert params == {} + + +@pytest.mark.unit +def test_resolve_static_route(): + """Test resolving a static route.""" + router = Router() + + def handler(req): + return "Static Response" + + router.add_route("/home", handler, methods=["GET"], endpoint="home") + + handler_result, params = router.resolve("/home", "GET") + assert handler_result == handler + assert params == {} + + +@pytest.mark.unit +def test_resolve_dynamic_route_with_int(): + """Test resolving a dynamic route with integer parameter.""" + router = Router() + + def handler(req, user_id): + return f"User {user_id}" + + router.add_route("/users/", handler, methods=["GET"], endpoint="user_detail") + + handler_result, params = router.resolve("/users/123", "GET") + assert handler_result == handler + assert params == {'user_id': 123} + assert isinstance(params['user_id'], int) + + +@pytest.mark.unit +def test_resolve_multiple_dynamic_parameters(): + """Test resolving routes with multiple dynamic parameters.""" + router = Router() + + def handler(req, user_id, post_id): + return f"User {user_id} Post {post_id}" + + router.add_route("/users//posts/", handler, endpoint="user_post") + + handler_result, params = router.resolve("/users/42/posts/100", "GET") + assert handler_result == handler + assert params == {'user_id': 42, 'post_id': 100} + + +@pytest.mark.unit +def test_resolve_string_parameter(): + """Test resolving routes with string parameters.""" + router = Router() + + def handler(req, username): + return f"User {username}" + + router.add_route("/profile/", handler, methods=["GET"], endpoint="profile") + + handler_result, params = router.resolve("/profile/john_doe", "GET") + assert handler_result == handler + assert params == {'username': 'john_doe'} + + +@pytest.mark.unit +def test_resolve_path_parameter(): + """Test resolving routes with path parameters (catch-all).""" + router = Router() + + def handler(req, filepath): + return f"File {filepath}" + + router.add_route("/files/", handler, methods=["GET"], endpoint="file_serve") + + handler_result, params = router.resolve("/files/docs/readme.txt", "GET") + assert handler_result == handler + assert 'filepath' in params + + +@pytest.mark.unit +def test_resolve_not_found(): + """Test that resolving non-existent route raises NotFound.""" + from jsweb.routing import NotFound + + router = Router() + + def handler(req): + return "OK" + + router.add_route("/exists", handler, endpoint="exists") + + with pytest.raises(NotFound): + router.resolve("/does-not-exist", "GET") + + +@pytest.mark.unit +def test_resolve_wrong_method(): + """Test that route with wrong method raises MethodNotAllowed.""" + from jsweb.routing import MethodNotAllowed + + router = Router() + + def handler(req): + return "OK" + + router.add_route("/api/data", handler, methods=["POST"], endpoint="create_data") + + with pytest.raises(MethodNotAllowed): + router.resolve("/api/data", "GET") + + +@pytest.mark.unit +def test_multiple_routes(): + """Test routing with multiple registered routes.""" + router = Router() + + def home_handler(req): + return "Home" + + def about_handler(req): + return "About" + + def user_handler(req, user_id): + return f"User {user_id}" + + router.add_route("/", home_handler, methods=["GET"], endpoint="home") + router.add_route("/about", about_handler, methods=["GET"], endpoint="about") + router.add_route("/users/", user_handler, methods=["GET"], endpoint="user") + + # Test home route + handler, params = router.resolve("/", "GET") + assert handler == home_handler + assert params == {} + + # Test about route + handler, params = router.resolve("/about", "GET") + assert handler == about_handler + assert params == {} + + # Test user route + handler, params = router.resolve("/users/99", "GET") + assert handler == user_handler + assert params == {'user_id': 99} + + +@pytest.mark.unit +def test_route_method_filtering(): + """Test that routes correctly filter by HTTP method.""" + from jsweb.routing import MethodNotAllowed + router = Router() + + def handler(req): + return "OK" + + router.add_route("/api/items", handler, methods=["GET", "POST"], endpoint="items") + + # GET should match + handler_result, _ = router.resolve("/api/items", "GET") + assert handler_result == handler + + # POST should match + handler_result, _ = router.resolve("/api/items", "POST") + assert handler_result == handler + + # DELETE should not match - raises exception + with pytest.raises(MethodNotAllowed): + router.resolve("/api/items", "DELETE") + + +@pytest.mark.unit +def test_default_methods(): + """Test that routes default to GET method.""" + from jsweb.routing import MethodNotAllowed + router = Router() + + def handler(req): + return "OK" + + router.add_route("/default", handler, endpoint="default") + + # Should resolve GET by default + handler_result, _ = router.resolve("/default", "GET") + assert handler_result == handler + + # POST should not match - raises exception + with pytest.raises(MethodNotAllowed): + router.resolve("/default", "POST") + + +@pytest.mark.slow +def test_static_route_performance(): + """Benchmark static route resolution performance.""" + router = Router() + + # Add 50 static routes + for i in range(50): + router.add_route(f"/pages/{i}", lambda req: "OK", endpoint=f"page_{i}") + + # Resolve middle route 1000 times + import time + start = time.perf_counter() + for _ in range(1000): + router.resolve("/pages/25", "GET") + elapsed = (time.perf_counter() - start) * 1000 # Convert to ms + + # Should be reasonably fast (under 10ms for 1000 requests) + assert elapsed < 10, f"Static route resolution took {elapsed}ms for 1000 requests" + + +@pytest.mark.slow +def test_dynamic_route_performance(): + """Benchmark dynamic route resolution performance.""" + router = Router() + + # Add 10 dynamic routes + for i in range(10): + router.add_route(f"/users//posts/", + lambda req: "OK", endpoint=f"user_post_{i}") + + # Resolve 1000 times + import time + start = time.perf_counter() + for _ in range(1000): + router.resolve("/users/123/posts/456", "GET") + elapsed = (time.perf_counter() - start) * 1000 # Convert to ms + + # Should be reasonably fast (under 50ms for 1000 requests) + assert elapsed < 50, f"Dynamic route resolution took {elapsed}ms for 1000 requests" diff --git a/Tests/test_routing_comparison.py b/Tests/test_routing_comparison.py deleted file mode 100644 index 558baf0..0000000 --- a/Tests/test_routing_comparison.py +++ /dev/null @@ -1,155 +0,0 @@ -import time -import re -from typing import Dict, List - -# ========== OLD ROUTING (Unoptimized) ========== -class OldRoute: - def __init__(self, path, handler, methods, endpoint): - self.path = path - self.handler = handler - self.methods = methods - self.endpoint = endpoint - self.converters = {} - self.regex, self.param_names = self._compile_path() - - def _compile_path(self): - type_converters = { - 'str': (str, r'[^/]+'), - 'int': (int, r'\d+'), - 'path': (str, r'.+?') - } - param_defs = re.findall(r"<(\w+):(\w+)>", self.path) - regex_path = "^" + self.path + "$" - param_names = [] - for type_name, param_name in param_defs: - converter, regex_part = type_converters.get(type_name, type_converters['str']) - regex_path = regex_path.replace(f"<{type_name}:{param_name}>", f"(?P<{param_name}>{regex_part})") - self.converters[param_name] = converter - param_names.append(param_name) - return re.compile(regex_path), param_names - - def match(self, path): - match = self.regex.match(path) - if not match: - return None - params = match.groupdict() - try: - for name, value in params.items(): - params[name] = self.converters[name](value) - return params - except ValueError: - return None - -class OldRouter: - def __init__(self): - self.routes = [] - self.endpoints = {} - - def add_route(self, path, handler, methods=None, endpoint=None): - if methods is None: - methods = ["GET"] - if endpoint is None: - endpoint = handler.__name__ - if endpoint in self.endpoints: - raise ValueError(f"Endpoint \"{endpoint}\" is already registered.") - route = OldRoute(path, handler, methods, endpoint) - self.routes.append(route) - self.endpoints[endpoint] = route - - def resolve(self, path, method): - for route in self.routes: - params = route.match(path) - if params is not None: - if method in route.methods: - return route.handler, params - return None, None - -# ========== NEW ROUTING (Optimized) ========== -from jsweb.routing import Router as NewRouter - -# ========== BENCHMARK ========== -def benchmark_comparison(): - print("=" * 60) - print("ROUTING PERFORMANCE COMPARISON") - print("=" * 60) - - # Setup old router - old_router = OldRouter() - for i in range(40): - old_router.add_route(f"/pages/{i}", lambda req: "OK", methods=["GET"], endpoint=f"old_page_{i}") - for i in range(10): - old_router.add_route(f"/users//posts/", - lambda req: "OK", endpoint=f"old_user_post_{i}") - - # Setup new router - new_router = NewRouter() - for i in range(50): - new_router.add_route(f"/pages/{i}", lambda req: "OK", methods=["GET"], endpoint=f"new_page_{i}") - for i in range(10): - new_router.add_route(f"/users//posts/", - lambda req: "OK", endpoint=f"new_user_post_{i}") - - iterations = 100000 - - # ===== STATIC ROUTE BENCHMARK ===== - print(f"\nSTATIC ROUTE (/pages/25) - {iterations:,} requests") - print("-" * 60) - - # Old router - start = time.perf_counter() - for _ in range(iterations): - old_router.resolve("/pages/25", "GET") - old_static_ms = (time.perf_counter() - start) * 1000 - - # New router - start = time.perf_counter() - for _ in range(iterations): - new_router.resolve("/pages/25", "GET") - new_static_ms = (time.perf_counter() - start) * 1000 - - static_improvement = ((old_static_ms - new_static_ms) / old_static_ms) * 100 - - print(f"Old Router: {old_static_ms:7.2f}ms total | {old_static_ms/iterations*1000:7.4f}μs per request") - print(f"New Router: {new_static_ms:7.2f}ms total | {new_static_ms/iterations*1000:7.4f}μs per request") - print(f"Improvement: {static_improvement:+.1f}% faster") - print(f"Speedup: {old_static_ms/new_static_ms:.2f}x") - - # ===== DYNAMIC ROUTE BENCHMARK ===== - print(f"\nDYNAMIC ROUTE (/users/123/posts/456) - {iterations:,} requests") - print("-" * 60) - - # Old router - start = time.perf_counter() - for _ in range(iterations): - old_router.resolve("/users/123/posts/456", "GET") - old_dynamic_ms = (time.perf_counter() - start) * 1000 - - # New router - start = time.perf_counter() - for _ in range(iterations): - new_router.resolve("/users/123/posts/456", "GET") - new_dynamic_ms = (time.perf_counter() - start) * 1000 - - dynamic_improvement = ((old_dynamic_ms - new_dynamic_ms) / old_dynamic_ms) * 100 - - print(f"Old Router: {old_dynamic_ms:7.2f}ms total | {old_dynamic_ms/iterations*1000:7.4f}μs per request") - print(f"New Router: {new_dynamic_ms:7.2f}ms total | {new_dynamic_ms/iterations*1000:7.4f}μs per request") - print(f"Improvement: {dynamic_improvement:+.1f}% faster") - print(f"Speedup: {old_dynamic_ms/new_dynamic_ms:.2f}x") - - # ===== SUMMARY ===== - print(f"\n" + "=" * 60) - print("SUMMARY") - print("=" * 60) - print(f"Static Routes: {static_improvement:+6.1f}% improvement ({old_static_ms/new_static_ms:.2f}x faster)") - print(f"Dynamic Routes: {dynamic_improvement:+6.1f}% improvement ({old_dynamic_ms/new_dynamic_ms:.2f}x faster)") - - if static_improvement >= 90: - print(f"\nSUCCESS! Achieved 90%+ improvement on static routes!") - elif static_improvement >= 50: - print(f"\nGOOD! Significant performance improvement achieved!") - else: - print(f"\nModerate improvement - consider further optimizations") - -if __name__ == "__main__": - benchmark_comparison() \ No newline at end of file diff --git a/Tests/test_routing_optimized.py b/Tests/test_routing_optimized.py deleted file mode 100644 index be99e3e..0000000 --- a/Tests/test_routing_optimized.py +++ /dev/null @@ -1,139 +0,0 @@ -""" -Test script to verify Phase 1 routing optimizations work correctly. -""" -from jsweb.routing import Router, NotFound, MethodNotAllowed - -def test_static_routes(): - """Test static route optimization""" - router = Router() - - @router.route("/", methods=["GET"]) - def home(): - return "Home" - - @router.route("/about", methods=["GET", "POST"]) - def about(): - return "About" - - # Test successful resolution - handler, params = router.resolve("/", "GET") - assert handler == home - assert params == {} - print("[OK] Static route GET /") - - handler, params = router.resolve("/about", "POST") - assert handler == about - assert params == {} - print("[OK] Static route POST /about") - - # Test method not allowed - try: - router.resolve("/", "POST") - assert False, "Should raise MethodNotAllowed" - except MethodNotAllowed: - print("[OK] Method not allowed works") - -def test_dynamic_routes(): - """Test dynamic route with typed converters""" - router = Router() - - @router.route("/users/", methods=["GET"]) - def get_user(user_id): - return f"User {user_id}" - - @router.route("/posts//comments/", methods=["GET"]) - def get_comment(post_id, comment_id): - return f"Post {post_id}, Comment {comment_id}" - - @router.route("/files/", methods=["GET"]) - def get_file(filepath): - return f"File {filepath}" - - # Test int converter - handler, params = router.resolve("/users/123", "GET") - assert handler == get_user - assert params == {"user_id": 123} - assert isinstance(params["user_id"], int) - print("[OK] Int converter: /users/123 -> user_id=123 (int)") - - # Test negative int - handler, params = router.resolve("/users/-5", "GET") - assert params == {"user_id": -5} - print("[OK] Negative int converter: /users/-5 -> user_id=-5") - - # Test multiple int params - handler, params = router.resolve("/posts/42/comments/7", "GET") - assert handler == get_comment - assert params == {"post_id": 42, "comment_id": 7} - print("[OK] Multiple int params: /posts/42/comments/7") - - # Test path converter - handler, params = router.resolve("/files/docs/readme.txt", "GET") - assert handler == get_file - assert params == {"filepath": "docs/readme.txt"} - print("[OK] Path converter: /files/docs/readme.txt") - - # Test invalid int (should not match) - try: - router.resolve("/users/abc", "GET") - assert False, "Should raise NotFound for invalid int" - except NotFound: - print("[OK] Invalid int rejected: /users/abc") - -def test_url_for(): - """Test reverse URL generation""" - router = Router() - - @router.route("/", endpoint="home") - def home(): - return "Home" - - @router.route("/users/", endpoint="user_detail") - def user_detail(user_id): - return f"User {user_id}" - - # Static route - url = router.url_for("home") - assert url == "/" - print("[OK] url_for static: home -> /") - - # Dynamic route - url = router.url_for("user_detail", user_id=42) - assert url == "/users/42" - print("[OK] url_for dynamic: user_detail(user_id=42) -> /users/42") - -def test_slots_memory(): - """Verify __slots__ is working""" - router = Router() - - @router.route("/test", methods=["GET"]) - def test(): - return "Test" - - route = router.static_routes["/test"] - - # __slots__ should prevent adding arbitrary attributes - try: - route.some_random_attribute = "value" - assert False, "__slots__ should prevent new attributes" - except AttributeError: - print("[OK] __slots__ working: prevents arbitrary attributes") - -if __name__ == "__main__": - print("Testing Phase 1 Routing Optimizations") - print("=" * 50) - - test_static_routes() - print() - - test_dynamic_routes() - print() - - test_url_for() - print() - - test_slots_memory() - print() - - print("=" * 50) - print("[PASS] All tests passed! Phase 1 optimizations working correctly.") \ No newline at end of file diff --git a/Tests/test_routing_scale.py b/Tests/test_routing_scale.py deleted file mode 100644 index b67f182..0000000 --- a/Tests/test_routing_scale.py +++ /dev/null @@ -1,126 +0,0 @@ -""" -Benchmark routing performance with 1000 routes to test scalability. -""" -import time -from jsweb.routing import Router - -def benchmark_1000_routes(): - """Test routing performance with 1000 static and 1000 dynamic routes.""" - router = Router() - - print("=" * 60) - print("ROUTING SCALABILITY TEST - 1000 ROUTES") - print("=" * 60) - - # Add 1000 static routes - print("\nSetting up 1000 static routes...") - for i in range(1000): - router.add_route(f"/static/page/{i}", lambda req: "OK", methods=["GET"], endpoint=f"static_page_{i}") - - # Add 1000 dynamic routes - print("Setting up 1000 dynamic routes...") - for i in range(1000): - router.add_route(f"/dynamic//resource/{i}", lambda req: "OK", methods=["GET"], endpoint=f"dynamic_resource_{i}") - - print(f"\nTotal routes: {len(router.static_routes)} static + {len(router.dynamic_routes)} dynamic") - - # Benchmark static route - best case (first route) - print("\n" + "-" * 60) - print("STATIC ROUTE - BEST CASE (first route)") - print("-" * 60) - start = time.perf_counter() - for _ in range(100000): - handler, params = router.resolve("/static/page/0", "GET") - best_static_ms = (time.perf_counter() - start) * 1000 - print(f"Time: {best_static_ms:.2f}ms total | {best_static_ms/100:.4f}μs per request") - - # Benchmark static route - worst case (last route) - print("\n" + "-" * 60) - print("STATIC ROUTE - WORST CASE (last route)") - print("-" * 60) - start = time.perf_counter() - for _ in range(100000): - handler, params = router.resolve("/static/page/999", "GET") - worst_static_ms = (time.perf_counter() - start) * 1000 - print(f"Time: {worst_static_ms:.2f}ms total | {worst_static_ms/100:.4f}μs per request") - - # Benchmark static route - middle case - print("\n" + "-" * 60) - print("STATIC ROUTE - AVERAGE CASE (middle route)") - print("-" * 60) - start = time.perf_counter() - for _ in range(100000): - handler, params = router.resolve("/static/page/500", "GET") - avg_static_ms = (time.perf_counter() - start) * 1000 - print(f"Time: {avg_static_ms:.2f}ms total | {avg_static_ms/100:.4f}μs per request") - - # Benchmark dynamic route - best case (first route) - print("\n" + "-" * 60) - print("DYNAMIC ROUTE - BEST CASE (first route)") - print("-" * 60) - start = time.perf_counter() - for _ in range(100000): - handler, params = router.resolve("/dynamic/123/resource/0", "GET") - best_dynamic_ms = (time.perf_counter() - start) * 1000 - print(f"Time: {best_dynamic_ms:.2f}ms total | {best_dynamic_ms/100:.4f}μs per request") - - # Benchmark dynamic route - worst case (last route) - print("\n" + "-" * 60) - print("DYNAMIC ROUTE - WORST CASE (last route)") - print("-" * 60) - start = time.perf_counter() - for _ in range(100000): - handler, params = router.resolve("/dynamic/123/resource/999", "GET") - worst_dynamic_ms = (time.perf_counter() - start) * 1000 - print(f"Time: {worst_dynamic_ms:.2f}ms total | {worst_dynamic_ms/100:.4f}μs per request") - - # Benchmark dynamic route - middle case - print("\n" + "-" * 60) - print("DYNAMIC ROUTE - AVERAGE CASE (middle route)") - print("-" * 60) - start = time.perf_counter() - for _ in range(100000): - handler, params = router.resolve("/dynamic/123/resource/500", "GET") - avg_dynamic_ms = (time.perf_counter() - start) * 1000 - print(f"Time: {avg_dynamic_ms:.2f}ms total | {avg_dynamic_ms/100:.4f}μs per request") - - # Summary - print("\n" + "=" * 60) - print("SUMMARY - 1000 ROUTES EACH") - print("=" * 60) - print(f"\nStatic Routes (O(1) dict lookup):") - print(f" Best case: {best_static_ms/100:.4f}μs per request") - print(f" Average case: {avg_static_ms/100:.4f}μs per request") - print(f" Worst case: {worst_static_ms/100:.4f}μs per request") - - print(f"\nDynamic Routes (O(n) linear search):") - print(f" Best case: {best_dynamic_ms/100:.4f}μs per request") - print(f" Average case: {avg_dynamic_ms/100:.4f}μs per request") - print(f" Worst case: {worst_dynamic_ms/100:.4f}μs per request") - - # Analysis - print("\n" + "=" * 60) - print("ANALYSIS") - print("=" * 60) - - # Check if static routes are still O(1) - if worst_static_ms / best_static_ms < 1.5: - print("Static routes: O(1) confirmed - no degradation with 1000 routes") - else: - print("Static routes: Some performance degradation detected") - - # Check if dynamic routes show linear degradation - dynamic_ratio = worst_dynamic_ms / best_dynamic_ms - print(f"\nDynamic routes worst/best ratio: {dynamic_ratio:.2f}x") - - if avg_dynamic_ms / 100 < 10: # Less than 10 microseconds average - print("Dynamic routes: Still fast enough (<10μs) - Phase 2 NOT needed") - elif avg_dynamic_ms / 100 < 50: # Less than 50 microseconds - print("Dynamic routes: Acceptable (<50μs) - Phase 2 optional") - else: - print("Dynamic routes: Slow (>50μs) - Phase 2 Radix Tree recommended") - - print("\n" + "=" * 60) - -if __name__ == "__main__": - benchmark_1000_routes() \ No newline at end of file diff --git a/Tests/test_security.py b/Tests/test_security.py new file mode 100644 index 0000000..f11a43b --- /dev/null +++ b/Tests/test_security.py @@ -0,0 +1,311 @@ +"""Tests for JsWeb security features (CSRF, validation, etc.).""" + +import pytest + + +@pytest.mark.unit +@pytest.mark.security +def test_csrf_token_generation(): + """Test CSRF token generation.""" + try: + from jsweb.security import generate_csrf_token + + token1 = generate_csrf_token() + token2 = generate_csrf_token() + + assert token1 is not None + assert token2 is not None + assert token1 != token2 # Tokens should be unique + except ImportError: + pytest.skip("CSRF utilities not available") + + +@pytest.mark.unit +@pytest.mark.security +def test_csrf_token_validation(): + """Test CSRF token validation.""" + try: + from jsweb.security import generate_csrf_token, validate_csrf_token + + token = generate_csrf_token() + assert validate_csrf_token(token) is not None or token is not None + except ImportError: + pytest.skip("CSRF utilities not available") + + +@pytest.mark.unit +@pytest.mark.security +def test_password_hashing(): + """Test password hashing functionality.""" + try: + from jsweb.security import hash_password, check_password + + password = "mySecurePassword123!" + hashed = hash_password(password) + + assert hashed is not None + assert hashed != password + assert check_password(password, hashed) + except ImportError: + pytest.skip("Password hashing not available") + + +@pytest.mark.unit +@pytest.mark.security +def test_password_hash_unique(): + """Test that same password produces different hashes.""" + try: + from jsweb.security import hash_password + + password = "testpassword" + hash1 = hash_password(password) + hash2 = hash_password(password) + + assert hash1 != hash2 # Should be different due to salt + except ImportError: + pytest.skip("Password hashing not available") + + +@pytest.mark.unit +@pytest.mark.security +def test_password_verification_fails_for_wrong_password(): + """Test that password verification fails for incorrect password.""" + try: + from jsweb.security import hash_password, check_password + + password = "correctpassword" + wrong_password = "wrongpassword" + hashed = hash_password(password) + + assert check_password(password, hashed) + assert not check_password(wrong_password, hashed) + except ImportError: + pytest.skip("Password hashing not available") + + +@pytest.mark.unit +@pytest.mark.security +def test_secure_random_generation(): + """Test secure random token generation.""" + try: + from jsweb.security import generate_secure_token + + token1 = generate_secure_token() + token2 = generate_secure_token() + + assert token1 is not None + assert token2 is not None + assert len(token1) > 10 + assert token1 != token2 + except ImportError: + pytest.skip("Secure token generation not available") + + +@pytest.mark.unit +@pytest.mark.security +def test_token_expiration(): + """Test token expiration functionality.""" + try: + from jsweb.security import generate_token_with_expiry, verify_token + import time + + token = generate_token_with_expiry(expiry_seconds=1) + assert token is not None + + # Token should be valid immediately + assert verify_token(token) + + # Wait for expiration + time.sleep(1.1) + # Token might be expired now + except ImportError: + pytest.skip("Token expiry not available") + + +@pytest.mark.unit +@pytest.mark.security +def test_input_sanitization(): + """Test input sanitization.""" + try: + from jsweb.security import sanitize_input + + malicious = "" + safe = sanitize_input(malicious) + + assert safe is not None + assert '" safe = sanitize_input(malicious) - + assert safe is not None - assert '