|
import pytest |
|
import pandas as pd |
|
import numpy as np |
|
import tempfile |
|
import os |
|
from unittest.mock import patch, MagicMock |
|
from agentic_ai_system.data_ingestion import load_data, validate_data, _load_csv_data, _load_synthetic_data |
|
|
|
class TestDataIngestion: |
|
"""Test cases for data ingestion module""" |
|
|
|
@pytest.fixture |
|
def config(self): |
|
"""Sample configuration for testing""" |
|
return { |
|
'data_source': { |
|
'type': 'csv', |
|
'path': 'data/market_data.csv' |
|
}, |
|
'synthetic_data': { |
|
'base_price': 150.0, |
|
'volatility': 0.02, |
|
'trend': 0.001, |
|
'noise_level': 0.005, |
|
'data_path': 'data/synthetic_market_data.csv' |
|
}, |
|
'trading': { |
|
'symbol': 'AAPL', |
|
'timeframe': '1min' |
|
} |
|
} |
|
|
|
@pytest.fixture |
|
def sample_csv_data(self): |
|
"""Create sample CSV data for testing""" |
|
dates = pd.date_range(start='2024-01-01', periods=100, freq='1min') |
|
|
|
data = [] |
|
for i, date in enumerate(dates): |
|
base_price = 150.0 + (i * 0.1) |
|
|
|
|
|
open_price = base_price + np.random.normal(0, 1) |
|
close_price = base_price + np.random.normal(0, 1) |
|
|
|
|
|
high_price = max(open_price, close_price) + abs(np.random.normal(0, 1)) |
|
|
|
|
|
low_price = min(open_price, close_price) - abs(np.random.normal(0, 1)) |
|
|
|
data.append({ |
|
'timestamp': date, |
|
'open': open_price, |
|
'high': high_price, |
|
'low': low_price, |
|
'close': close_price, |
|
'volume': np.random.randint(1000, 100000) |
|
}) |
|
|
|
return pd.DataFrame(data) |
|
|
|
def test_load_data_csv_type(self, config, sample_csv_data): |
|
"""Test loading data with CSV type""" |
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp_file: |
|
sample_csv_data.to_csv(tmp_file.name, index=False) |
|
config['data_source']['path'] = tmp_file.name |
|
|
|
try: |
|
result = load_data(config) |
|
|
|
assert isinstance(result, pd.DataFrame) |
|
assert len(result) == len(sample_csv_data) |
|
assert list(result.columns) == list(sample_csv_data.columns) |
|
|
|
finally: |
|
os.unlink(tmp_file.name) |
|
|
|
def test_load_data_synthetic_type(self, config): |
|
"""Test loading data with synthetic type""" |
|
config['data_source']['type'] = 'synthetic' |
|
|
|
with patch('agentic_ai_system.data_ingestion._load_synthetic_data') as mock_generate: |
|
mock_df = pd.DataFrame({ |
|
'timestamp': pd.date_range('2024-01-01', periods=10, freq='1min'), |
|
'open': [150] * 10, |
|
'high': [155] * 10, |
|
'low': [145] * 10, |
|
'close': [152] * 10, |
|
'volume': [1000] * 10 |
|
}) |
|
mock_generate.return_value = mock_df |
|
|
|
result = load_data(config) |
|
|
|
assert isinstance(result, pd.DataFrame) |
|
mock_generate.assert_called_once_with(config) |
|
|
|
def test_load_data_invalid_type(self, config): |
|
"""Test loading data with invalid type""" |
|
config['data_source']['type'] = 'invalid_type' |
|
|
|
result = load_data(config) |
|
assert result is None |
|
|
|
def test_load_csv_data_file_exists(self, config, sample_csv_data): |
|
"""Test loading CSV data when file exists""" |
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp_file: |
|
sample_csv_data.to_csv(tmp_file.name, index=False) |
|
config['data_source']['path'] = tmp_file.name |
|
|
|
try: |
|
result = _load_csv_data(config) |
|
|
|
assert isinstance(result, pd.DataFrame) |
|
assert len(result) == len(sample_csv_data) |
|
assert result['timestamp'].dtype == 'datetime64[ns]' |
|
|
|
finally: |
|
os.unlink(tmp_file.name) |
|
|
|
def test_load_csv_data_file_not_exists(self, config): |
|
"""Test loading CSV data when file doesn't exist""" |
|
config['data_source']['path'] = 'nonexistent_file.csv' |
|
|
|
result = _load_csv_data(config) |
|
|
|
assert result is None |
|
|
|
def test_load_csv_data_missing_columns(self, config): |
|
"""Test loading CSV data with missing columns""" |
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp_file: |
|
|
|
incomplete_data = pd.DataFrame({ |
|
'timestamp': pd.date_range('2024-01-01', periods=10, freq='1min'), |
|
'open': [150] * 10, |
|
'close': [152] * 10 |
|
|
|
}) |
|
incomplete_data.to_csv(tmp_file.name, index=False) |
|
config['data_source']['path'] = tmp_file.name |
|
|
|
try: |
|
result = _load_csv_data(config) |
|
|
|
assert result is None |
|
|
|
finally: |
|
os.unlink(tmp_file.name) |
|
|
|
def test_load_synthetic_data(self, config): |
|
"""Test synthetic data loading (mock generator and file existence)""" |
|
mock_df = pd.DataFrame({ |
|
'timestamp': pd.date_range('2024-01-01', periods=10, freq='1min'), |
|
'open': [150] * 10, |
|
'high': [155] * 10, |
|
'low': [145] * 10, |
|
'close': [152] * 10, |
|
'volume': [1000] * 10 |
|
}) |
|
with patch('os.path.exists', return_value=False): |
|
with patch('agentic_ai_system.synthetic_data_generator.SyntheticDataGenerator') as mock_generator_class: |
|
mock_generator = MagicMock() |
|
mock_generator_class.return_value = mock_generator |
|
mock_generator.generate_data.return_value = mock_df |
|
|
|
result = _load_synthetic_data(config) |
|
assert isinstance(result, pd.DataFrame) |
|
assert list(result.columns) == ['timestamp', 'open', 'high', 'low', 'close', 'volume'] |
|
|
|
def test_validate_data_valid(self, sample_csv_data): |
|
"""Test data validation with valid data""" |
|
|
|
data_copy = sample_csv_data.copy() |
|
assert validate_data(data_copy) == True |
|
|
|
def test_validate_data_missing_columns(self): |
|
"""Test data validation with missing columns""" |
|
invalid_data = pd.DataFrame({ |
|
'timestamp': pd.date_range('2024-01-01', periods=10, freq='1min'), |
|
'open': [150] * 10 |
|
|
|
}) |
|
|
|
assert validate_data(invalid_data) == False |
|
|
|
def test_validate_data_negative_prices(self): |
|
"""Test data validation with negative prices""" |
|
invalid_data = pd.DataFrame({ |
|
'timestamp': pd.date_range('2024-01-01', periods=10, freq='1min'), |
|
'open': [150] * 10, |
|
'high': [155] * 10, |
|
'low': [-145] * 10, |
|
'close': [152] * 10, |
|
'volume': [1000] * 10 |
|
}) |
|
|
|
assert validate_data(invalid_data) == False |
|
|
|
def test_validate_data_negative_volumes(self): |
|
"""Test data validation with negative volumes""" |
|
invalid_data = pd.DataFrame({ |
|
'timestamp': pd.date_range('2024-01-01', periods=10, freq='1min'), |
|
'open': [150] * 10, |
|
'high': [155] * 10, |
|
'low': [145] * 10, |
|
'close': [152] * 10, |
|
'volume': [-1000] * 10 |
|
}) |
|
|
|
|
|
|
|
assert validate_data(invalid_data) == True |
|
|
|
def test_validate_data_invalid_ohlc(self): |
|
"""Test data validation with invalid OHLC relationships""" |
|
invalid_data = pd.DataFrame({ |
|
'timestamp': pd.date_range('2024-01-01', periods=10, freq='1min'), |
|
'open': [150] * 10, |
|
'high': [145] * 10, |
|
'low': [145] * 10, |
|
'close': [152] * 10, |
|
'volume': [1000] * 10 |
|
}) |
|
|
|
assert validate_data(invalid_data) == False |
|
|
|
def test_validate_data_null_values(self): |
|
"""Test data validation with null values""" |
|
invalid_data = pd.DataFrame({ |
|
'timestamp': pd.date_range('2024-01-01', periods=10, freq='1min'), |
|
'open': [150] * 10, |
|
'high': [155] * 10, |
|
'low': [145] * 10, |
|
'close': [152] * 10, |
|
'volume': [1000] * 10 |
|
}) |
|
|
|
|
|
invalid_data.loc[0, 'open'] = None |
|
|
|
|
|
|
|
result = validate_data(invalid_data) |
|
assert result == True |
|
|
|
assert len(invalid_data) == 9 |
|
|
|
def test_validate_data_empty_dataframe(self): |
|
"""Test data validation with empty DataFrame""" |
|
empty_data = pd.DataFrame() |
|
assert validate_data(empty_data) == False |
|
|
|
def test_load_data_error_handling(self, config): |
|
"""Test error handling in load_data""" |
|
config['data_source']['type'] = 'csv' |
|
config['data_source']['path'] = 'nonexistent_file.csv' |
|
|
|
result = load_data(config) |
|
assert result is None |
|
|
|
def test_csv_data_timestamp_conversion(self, config, sample_csv_data): |
|
"""Test timestamp conversion in CSV loading""" |
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as tmp_file: |
|
|
|
sample_csv_data['timestamp'] = sample_csv_data['timestamp'].astype(str) |
|
sample_csv_data.to_csv(tmp_file.name, index=False) |
|
config['data_source']['path'] = tmp_file.name |
|
|
|
try: |
|
result = _load_csv_data(config) |
|
|
|
|
|
assert result['timestamp'].dtype == 'datetime64[ns]' |
|
|
|
finally: |
|
os.unlink(tmp_file.name) |
|
|
|
def test_synthetic_data_directory_creation(self, config): |
|
"""Test that synthetic data directory is created if it doesn't exist""" |
|
with patch('os.makedirs') as mock_makedirs: |
|
with patch('agentic_ai_system.synthetic_data_generator.SyntheticDataGenerator') as mock_generator_class: |
|
mock_generator = MagicMock() |
|
mock_generator_class.return_value = mock_generator |
|
|
|
mock_df = pd.DataFrame({'test': [1, 2, 3]}) |
|
mock_generator.generate_data.return_value = mock_df |
|
|
|
|
|
with patch('os.path.exists', return_value=False): |
|
_load_synthetic_data(config) |
|
|
|
|
|
mock_makedirs.assert_called_once() |
|
|
|
def test_data_validation_edge_cases(self): |
|
"""Test data validation with edge cases""" |
|
|
|
single_row_data = pd.DataFrame({ |
|
'timestamp': [pd.Timestamp('2024-01-01')], |
|
'open': [150], |
|
'high': [155], |
|
'low': [145], |
|
'close': [152], |
|
'volume': [1000] |
|
}) |
|
|
|
assert validate_data(single_row_data) == True |
|
|
|
|
|
large_data = pd.DataFrame({ |
|
'timestamp': pd.date_range('2024-01-01', periods=5, freq='1min'), |
|
'open': [1e6] * 5, |
|
'high': [1e6 + 100] * 5, |
|
'low': [1e6 - 100] * 5, |
|
'close': [1e6 + 50] * 5, |
|
'volume': [1e9] * 5 |
|
}) |
|
|
|
assert validate_data(large_data) == True |