API Reference
Examples
API Examples
This page provides comprehensive examples of using the RL-IDS API for various integration scenarios.
Basic Usage Examples
1. Service Health Check
# Check if the API service is running
curl -X GET http://localhost:8000/health
# Expected response
{
"status": "healthy",
"timestamp": "2024-01-15T10:30:00Z",
"details": {
"model_loaded": true,
"api_version": "1.2.0",
"memory_usage": 45.6,
"uptime": 3600
}
}
2. Get Model Information
# Get information about the loaded model
curl -X GET http://localhost:8000/model/info
# Expected response
{
"model_name": "DQN_IDS_Model",
"model_type": "Deep Q-Network",
"version": "1.2.0",
"input_features": 78,
"output_classes": 2,
"class_names": ["BENIGN", "ATTACK"],
"training_date": "2024-01-10T15:30:00Z",
"accuracy": 0.953,
"false_positive_rate": 0.047
}
3. Single Prediction
# Make a single prediction
curl -X POST http://localhost:8000/predict \
-H "Content-Type: application/json" \
-d '{
"features": [
0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0,
1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0,
2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7, 2.8, 2.9, 3.0,
3.1, 3.2, 3.3, 3.4, 3.5, 3.6, 3.7, 3.8, 3.9, 4.0,
4.1, 4.2, 4.3, 4.4, 4.5, 4.6, 4.7, 4.8, 4.9, 5.0,
5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 5.7, 5.8, 5.9, 6.0,
6.1, 6.2, 6.3, 6.4, 6.5, 6.6, 6.7, 6.8, 6.9, 7.0,
7.1, 7.2, 7.3, 7.4, 7.5, 7.6, 7.7, 7.8
]
}'
# Expected response
{
"prediction": {
"class": "BENIGN",
"confidence": 0.87,
"probability": {
"BENIGN": 0.87,
"ATTACK": 0.13
}
},
"timestamp": "2024-01-15T10:35:00Z",
"processing_time": 0.045
}
4. Batch Predictions
# Make multiple predictions at once
curl -X POST http://localhost:8000/predict/batch \
-H "Content-Type: application/json" \
-d '{
"features": [
[0.1, 0.2, 0.3, ..., 7.8],
[1.1, 1.2, 1.3, ..., 8.8],
[2.1, 2.2, 2.3, ..., 9.8]
]
}'
# Expected response
{
"predictions": [
{
"class": "BENIGN",
"confidence": 0.87,
"probability": {"BENIGN": 0.87, "ATTACK": 0.13}
},
{
"class": "ATTACK",
"confidence": 0.92,
"probability": {"BENIGN": 0.08, "ATTACK": 0.92}
},
{
"class": "BENIGN",
"confidence": 0.79,
"probability": {"BENIGN": 0.79, "ATTACK": 0.21}
}
],
"timestamp": "2024-01-15T10:40:00Z",
"processing_time": 0.098,
"total_predictions": 3
}
Python Integration Examples
1. Using Requests Library
import requests
import json
# API configuration
API_BASE_URL = "http://localhost:8000"
def check_api_health():
"""Check if the API is healthy and ready."""
try:
response = requests.get(f"{API_BASE_URL}/health")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Health check failed: {e}")
return None
def get_model_info():
"""Get information about the loaded model."""
try:
response = requests.get(f"{API_BASE_URL}/model/info")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Failed to get model info: {e}")
return None
def make_prediction(features):
"""Make a single prediction."""
try:
payload = {"features": features}
response = requests.post(
f"{API_BASE_URL}/predict",
json=payload,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Prediction failed: {e}")
return None
def make_batch_predictions(features_list):
"""Make multiple predictions at once."""
try:
payload = {"features": features_list}
response = requests.post(
f"{API_BASE_URL}/predict/batch",
json=payload,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Batch prediction failed: {e}")
return None
# Example usage
if __name__ == "__main__":
# Check API health
health = check_api_health()
print(f"API Health: {health}")
# Get model information
model_info = get_model_info()
print(f"Model Info: {model_info}")
# Sample feature vector (78 features)
sample_features = [0.1] * 78
# Make single prediction
prediction = make_prediction(sample_features)
print(f"Prediction: {prediction}")
# Make batch predictions
batch_features = [sample_features] * 3
batch_predictions = make_batch_predictions(batch_features)
print(f"Batch Predictions: {batch_predictions}")
2. Using the Official Python Client
from api.client import RLIDSClient
import asyncio
async def main():
# Initialize client
client = RLIDSClient("http://localhost:8000")
try:
# Check health
health = await client.health_check()
print(f"Health: {health}")
# Get model info
model_info = await client.get_model_info()
print(f"Model: {model_info}")
# Sample features
features = [0.1] * 78
# Single prediction
prediction = await client.predict(features)
print(f"Prediction: {prediction}")
# Batch prediction
batch_features = [features] * 5
batch_predictions = await client.predict_batch(batch_features)
print(f"Batch Results: {batch_predictions}")
except Exception as e:
print(f"Error: {e}")
finally:
await client.close()
# Run the example
if __name__ == "__main__":
asyncio.run(main())
3. Real-time Network Monitoring Integration
import asyncio
import time
from api.client import RLIDSClient
from rl_ids.make_dataset import extract_features
class NetworkMonitor:
def __init__(self, api_url="http://localhost:8000"):
self.client = RLIDSClient(api_url)
self.running = False
async def start_monitoring(self):
"""Start real-time network monitoring."""
self.running = True
print("Starting network monitoring...")
try:
while self.running:
# Capture network data (simplified)
network_data = self.capture_network_data()
if network_data:
# Extract features
features = extract_features(network_data)
# Make prediction
result = await self.client.predict(features)
# Process result
await self.process_prediction(result, network_data)
# Wait before next capture
await asyncio.sleep(1.0)
except KeyboardInterrupt:
print("Monitoring stopped by user")
except Exception as e:
print(f"Monitoring error: {e}")
finally:
await self.client.close()
def capture_network_data(self):
"""Capture network data (placeholder)."""
# This would contain actual packet capture logic
# For demonstration, return mock data
return {
'packets': 100,
'bytes': 5000,
'duration': 1.0,
'protocols': ['TCP', 'UDP']
}
async def process_prediction(self, prediction, network_data):
"""Process prediction results."""
if prediction['prediction']['class'] == 'ATTACK':
confidence = prediction['prediction']['confidence']
print(f"ALERT: Attack detected with {confidence:.2%} confidence")
print(f"Network data: {network_data}")
# Here you would implement alerting logic
await self.send_alert(prediction, network_data)
else:
print(f"Normal traffic detected ({prediction['prediction']['confidence']:.2%})")
async def send_alert(self, prediction, network_data):
"""Send alert for detected attack."""
alert = {
'timestamp': time.time(),
'type': 'ATTACK_DETECTED',
'confidence': prediction['prediction']['confidence'],
'network_data': network_data
}
# Implement your alerting mechanism here
print(f"Sending alert: {alert}")
def stop_monitoring(self):
"""Stop the monitoring process."""
self.running = False
# Usage example
if __name__ == "__main__":
monitor = NetworkMonitor()
asyncio.run(monitor.start_monitoring())
Error Handling Examples
1. Handling API Errors
import requests
from requests.exceptions import RequestException, Timeout, ConnectionError
def robust_prediction(features, max_retries=3):
"""Make prediction with error handling and retries."""
for attempt in range(max_retries):
try:
response = requests.post(
"http://localhost:8000/predict",
json={"features": features},
timeout=30
)
# Check for HTTP errors
if response.status_code == 422:
print("Validation error: Invalid input format")
return None
response.raise_for_status()
return response.json()
except ConnectionError:
print(f"Connection error (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
except Timeout:
print(f"Request timeout (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
time.sleep(1)
except RequestException as e:
print(f"Request failed: {e}")
break
print("All retry attempts failed")
return None
# Example usage
features = [0.1] * 78
result = robust_prediction(features)
if result:
print(f"Prediction successful: {result}")
else:
print("Prediction failed after all retries")
2. Input Validation
def validate_features(features):
"""Validate feature input before sending to API."""
if not isinstance(features, list):
raise ValueError("Features must be a list")
if len(features) != 78:
raise ValueError(f"Expected 78 features, got {len(features)}")
for i, feature in enumerate(features):
if not isinstance(feature, (int, float)):
raise ValueError(f"Feature {i} must be numeric, got {type(feature)}")
if not (-1000 <= feature <= 1000): # Reasonable bounds
raise ValueError(f"Feature {i} value {feature} is out of reasonable bounds")
return True
def safe_prediction(features):
"""Make prediction with input validation."""
try:
# Validate input
validate_features(features)
# Make prediction
response = requests.post(
"http://localhost:8000/predict",
json={"features": features}
)
if response.status_code == 200:
return response.json()
else:
print(f"API error: {response.status_code} - {response.text}")
return None
except ValueError as e:
print(f"Input validation error: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Example usage
try:
features = [0.1] * 78 # Valid input
result = safe_prediction(features)
invalid_features = [0.1] * 77 # Invalid input
result = safe_prediction(invalid_features) # Will show validation error
except Exception as e:
print(f"Error: {e}")
Performance Optimization Examples
1. Batch Processing for High Throughput
import asyncio
import aiohttp
from typing import List, Dict
class BatchProcessor:
def __init__(self, api_url: str, batch_size: int = 100):
self.api_url = api_url
self.batch_size = batch_size
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def process_large_dataset(self, features_list: List[List[float]]):
"""Process a large dataset in batches."""
results = []
for i in range(0, len(features_list), self.batch_size):
batch = features_list[i:i + self.batch_size]
try:
batch_result = await self._process_batch(batch)
results.extend(batch_result)
print(f"Processed batch {i//self.batch_size + 1}, "
f"total processed: {len(results)}")
except Exception as e:
print(f"Batch processing error: {e}")
# Continue with next batch
continue
return results
async def _process_batch(self, batch: List[List[float]]):
"""Process a single batch."""
async with self.session.post(
f"{self.api_url}/predict/batch",
json={"features": batch}
) as response:
if response.status == 200:
data = await response.json()
return data["predictions"]
else:
raise Exception(f"API error: {response.status}")
# Usage example
async def main():
# Generate sample data
large_dataset = [[0.1] * 78 for _ in range(1000)]
async with BatchProcessor("http://localhost:8000", batch_size=50) as processor:
results = await processor.process_large_dataset(large_dataset)
print(f"Processed {len(results)} predictions")
# Analyze results
attack_count = sum(1 for r in results if r["class"] == "ATTACK")
print(f"Detected {attack_count} attacks out of {len(results)} samples")
if __name__ == "__main__":
asyncio.run(main())
2. Connection Pooling and Reuse
import aiohttp
import asyncio
from typing import Optional
class OptimizedClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
# Configure connection pool
connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=30, # Per-host connection limit
ttl_dns_cache=300, # DNS cache TTL
use_dns_cache=True,
)
# Configure timeout
timeout = aiohttp.ClientTimeout(total=30, connect=5)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={"Content-Type": "application/json"}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def predict(self, features: List[float]) -> Dict:
"""Make a single prediction with optimized connection."""
async with self.session.post(
f"{self.base_url}/predict",
json={"features": features}
) as response:
return await response.json()
async def predict_many(self, features_list: List[List[float]]) -> List[Dict]:
"""Make multiple concurrent predictions."""
tasks = []
for features in features_list:
task = asyncio.create_task(self.predict(features))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions
valid_results = [r for r in results if not isinstance(r, Exception)]
return valid_results
# Usage example
async def performance_test():
features_list = [[0.1] * 78 for _ in range(100)]
async with OptimizedClient("http://localhost:8000") as client:
start_time = asyncio.get_event_loop().time()
results = await client.predict_many(features_list)
end_time = asyncio.get_event_loop().time()
print(f"Processed {len(results)} predictions in {end_time - start_time:.2f} seconds")
print(f"Average time per prediction: {(end_time - start_time) / len(results):.3f} seconds")
if __name__ == "__main__":
asyncio.run(performance_test())
Integration with Security Tools
1. SIEM Integration Example
import json
import logging
from datetime import datetime
from api.client import RLIDSClient
class SIEMIntegration:
def __init__(self, api_url: str, siem_endpoint: str):
self.client = RLIDSClient(api_url)
self.siem_endpoint = siem_endpoint
self.logger = logging.getLogger(__name__)
async def process_network_event(self, network_event: Dict):
"""Process network event and send to SIEM if threat detected."""
try:
# Extract features from network event
features = self.extract_features(network_event)
# Get prediction
prediction = await self.client.predict(features)
# Create enriched event
enriched_event = {
'timestamp': datetime.utcnow().isoformat(),
'source_ip': network_event.get('source_ip'),
'dest_ip': network_event.get('dest_ip'),
'protocol': network_event.get('protocol'),
'original_event': network_event,
'ml_prediction': prediction,
'threat_score': prediction['prediction']['confidence'],
'classification': prediction['prediction']['class']
}
# Send to SIEM if attack detected
if prediction['prediction']['class'] == 'ATTACK':
await self.send_to_siem(enriched_event)
return enriched_event
except Exception as e:
self.logger.error(f"Failed to process network event: {e}")
return None
def extract_features(self, network_event: Dict) -> List[float]:
"""Extract ML features from network event."""
# This would contain actual feature extraction logic
# For demonstration, return mock features
return [0.1] * 78
async def send_to_siem(self, event: Dict):
"""Send enriched event to SIEM."""
try:
# Format for SIEM (this example uses JSON)
siem_event = {
'timestamp': event['timestamp'],
'event_type': 'ML_THREAT_DETECTION',
'severity': self.calculate_severity(event),
'source': 'RL-IDS',
'details': event
}
# Send to SIEM endpoint
# This would be actual SIEM API call
self.logger.info(f"Sending to SIEM: {json.dumps(siem_event, indent=2)}")
except Exception as e:
self.logger.error(f"Failed to send to SIEM: {e}")
def calculate_severity(self, event: Dict) -> str:
"""Calculate event severity based on confidence."""
confidence = event['threat_score']
if confidence >= 0.9:
return 'HIGH'
elif confidence >= 0.7:
return 'MEDIUM'
else:
return 'LOW'
# Usage example
async def main():
siem = SIEMIntegration(
api_url="http://localhost:8000",
siem_endpoint="https://siem.example.com/api/events"
)
# Sample network event
network_event = {
'source_ip': '192.168.1.100',
'dest_ip': '10.0.0.1',
'protocol': 'TCP',
'port': 80,
'packet_count': 1000,
'byte_count': 50000
}
result = await siem.process_network_event(network_event)
print(f"Processed event: {result}")
if __name__ == "__main__":
asyncio.run(main())
Monitoring and Alerting Examples
1. Health Monitoring
import asyncio
import logging
from datetime import datetime, timedelta
from api.client import RLIDSClient
class HealthMonitor:
def __init__(self, api_url: str, check_interval: int = 60):
self.client = RLIDSClient(api_url)
self.check_interval = check_interval
self.logger = logging.getLogger(__name__)
self.alert_cooldown = {}
async def start_monitoring(self):
"""Start continuous health monitoring."""
self.logger.info("Starting health monitoring...")
while True:
try:
await self.check_health()
await asyncio.sleep(self.check_interval)
except KeyboardInterrupt:
self.logger.info("Health monitoring stopped")
break
except Exception as e:
self.logger.error(f"Health monitoring error: {e}")
await asyncio.sleep(self.check_interval)
async def check_health(self):
"""Perform health check."""
try:
# Get health status
health = await self.client.health_check()
if health['status'] != 'healthy':
await self.send_alert('UNHEALTHY', health)
return
# Check model status
if not health['details'].get('model_loaded', False):
await self.send_alert('MODEL_NOT_LOADED', health)
return
# Check memory usage
memory_usage = health['details'].get('memory_usage', 0)
if memory_usage > 80: # 80% threshold
await self.send_alert('HIGH_MEMORY', health)
# Check response time
start_time = asyncio.get_event_loop().time()
await self.client.get_model_info()
response_time = asyncio.get_event_loop().time() - start_time
if response_time > 5.0: # 5 second threshold
await self.send_alert('SLOW_RESPONSE', {
'response_time': response_time,
'health': health
})
self.logger.info(f"Health check passed: {health['status']}")
except Exception as e:
await self.send_alert('API_ERROR', {'error': str(e)})
async def send_alert(self, alert_type: str, details: Dict):
"""Send alert with cooldown to prevent spam."""
now = datetime.utcnow()
# Check cooldown
if alert_type in self.alert_cooldown:
last_alert = self.alert_cooldown[alert_type]
if now - last_alert < timedelta(minutes=5):
return # Skip alert due to cooldown
# Send alert
alert = {
'timestamp': now.isoformat(),
'type': alert_type,
'details': details,
'severity': self.get_alert_severity(alert_type)
}
self.logger.error(f"ALERT: {json.dumps(alert, indent=2)}")
# Update cooldown
self.alert_cooldown[alert_type] = now
def get_alert_severity(self, alert_type: str) -> str:
"""Get alert severity level."""
severity_map = {
'UNHEALTHY': 'CRITICAL',
'MODEL_NOT_LOADED': 'CRITICAL',
'API_ERROR': 'HIGH',
'HIGH_MEMORY': 'MEDIUM',
'SLOW_RESPONSE': 'LOW'
}
return severity_map.get(alert_type, 'MEDIUM')
# Usage example
if __name__ == "__main__":
monitor = HealthMonitor("http://localhost:8000", check_interval=30)
asyncio.run(monitor.start_monitoring())
These examples demonstrate various ways to integrate and use the RL-IDS API effectively in different scenarios, from simple predictions to complex production integrations with security tools and monitoring systems.