Building Automated Threat Detection with Dark Web Monitoring APIs
Dark web monitoring APIs provide the raw threat intelligence. Your job as a developer is to transform that data into automated detection and response. This tutorial walks you through building a complete threat detection pipeline from scratch.
By the end, you'll have working code that polls for threats, filters relevant alerts, and triggers automated responses—all in under 100 lines.
Prerequisites
- Python 3.8+ or Node.js 16+
- Dark web monitoring API key (we'll use AdverseMonitor as an example)
- Basic understanding of REST APIs
- Optional: Slack webhook URL for notifications
Step 1: API Authentication
First, let's set up secure API authentication. Never hardcode API keys—use environment variables.
import os
import requests
from datetime import datetime, timedelta
# Load API key from environment
API_KEY = os.environ.get('DARKWEB_API_KEY')
API_BASE = 'https://api.adversemonitor.com/v1'
def get_headers():
return {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json'
}
# Test connection
def test_connection():
response = requests.get(
f'{API_BASE}/status',
headers=get_headers()
)
return response.status_code == 200
const axios = require('axios');
const API_KEY = process.env.DARKWEB_API_KEY;
const API_BASE = 'https://api.adversemonitor.com/v1';
const apiClient = axios.create({
baseURL: API_BASE,
headers: {
'Authorization': `Bearer ${API_KEY}`,
'Content-Type': 'application/json'
}
});
async function testConnection() {
const response = await apiClient.get('/status');
return response.status === 200;
}
Step 2: Building the Poller
The poller fetches new threats since the last check. We'll use a timestamp to track what we've already processed.
import json
from pathlib import Path
STATE_FILE = Path('last_poll.json')
def get_last_poll_time():
if STATE_FILE.exists():
data = json.loads(STATE_FILE.read_text())
return data.get('last_poll')
# Default: 1 hour ago
return (datetime.utcnow() - timedelta(hours=1)).isoformat()
def save_poll_time(timestamp):
STATE_FILE.write_text(json.dumps({'last_poll': timestamp}))
def fetch_new_threats():
since = get_last_poll_time()
response = requests.get(
f'{API_BASE}/threats',
headers=get_headers(),
params={'since': since, 'limit': 100}
)
if response.status_code == 200:
threats = response.json().get('threats', [])
if threats:
# Save timestamp of newest threat
newest = max(t['detected_at'] for t in threats)
save_poll_time(newest)
return threats
return []
Step 3: Filtering Relevant Threats
Not every threat requires action. Filter based on your organization's domains and risk tolerance.
# Configuration
MONITORED_DOMAINS = ['company.com', 'company.co.uk']
MIN_SEVERITY = 'medium' # low, medium, high, critical
SEVERITY_LEVELS = {
'low': 1,
'medium': 2,
'high': 3,
'critical': 4
}
def is_relevant(threat):
# Check domain match
affected_domains = threat.get('affected_domains', [])
domain_match = any(
any(d in affected for d in MONITORED_DOMAINS)
for affected in affected_domains
)
# Check severity threshold
severity = threat.get('severity', 'low')
meets_severity = SEVERITY_LEVELS.get(severity, 0) >= SEVERITY_LEVELS[MIN_SEVERITY]
return domain_match and meets_severity
def filter_threats(threats):
return [t for t in threats if is_relevant(t)]
Step 4: Sending Notifications
When relevant threats are detected, notify your team immediately.
SLACK_WEBHOOK = os.environ.get('SLACK_WEBHOOK_URL')
def send_slack_alert(threat):
severity_emoji = {
'critical': '🚨',
'high': '⚠️',
'medium': '📢',
'low': 'ℹ️'
}
emoji = severity_emoji.get(threat['severity'], '📢')
message = {
'blocks': [
{
'type': 'header',
'text': {
'type': 'plain_text',
'text': f'{emoji} Dark Web Alert: {threat["type"]}'
}
},
{
'type': 'section',
'fields': [
{'type': 'mrkdwn', 'text': f'*Severity:* {threat["severity"]}'},
{'type': 'mrkdwn', 'text': f'*Detected:* {threat["detected_at"]}'},
{'type': 'mrkdwn', 'text': f'*Source:* {threat["source"]}'}
]
},
{
'type': 'section',
'text': {
'type': 'mrkdwn',
'text': threat.get('summary', 'No summary available')
}
}
]
}
requests.post(SLACK_WEBHOOK, json=message)
Step 5: Automated Response
The real power is automated response. Here's an example that forces password resets for exposed credentials.
OKTA_DOMAIN = os.environ.get('OKTA_DOMAIN')
OKTA_TOKEN = os.environ.get('OKTA_API_TOKEN')
def force_password_reset(email):
# Find user in Okta
response = requests.get(
f'https://{OKTA_DOMAIN}/api/v1/users',
params={'search': f'profile.email eq "{email}"'},
headers={'Authorization': f'SSWS {OKTA_TOKEN}'}
)
users = response.json()
if not users:
return False
user_id = users[0]['id']
# Expire password
requests.post(
f'https://{OKTA_DOMAIN}/api/v1/users/{user_id}/lifecycle/expire_password',
headers={'Authorization': f'SSWS {OKTA_TOKEN}'}
)
return True
def handle_credential_leak(threat):
if threat['type'] != 'credential_leak':
return
exposed_emails = threat.get('exposed_credentials', [])
for cred in exposed_emails:
email = cred.get('email')
if email and any(d in email for d in MONITORED_DOMAINS):
success = force_password_reset(email)
if success:
print(f'Password reset forced for: {email}')
Step 6: Complete Pipeline
Here's the complete pipeline that runs on a schedule:
import time
import schedule
def run_pipeline():
print(f'[{datetime.utcnow()}] Running threat detection pipeline...')
# 1. Fetch new threats
threats = fetch_new_threats()
print(f' Found {len(threats)} new threats')
# 2. Filter relevant threats
relevant = filter_threats(threats)
print(f' {len(relevant)} threats match our criteria')
# 3. Process each threat
for threat in relevant:
# Send notification
send_slack_alert(threat)
# Handle credential leaks specially
if threat['type'] == 'credential_leak':
handle_credential_leak(threat)
print(' Pipeline complete')
# Run every 5 minutes
schedule.every(5).minutes.do(run_pipeline)
# Initial run
run_pipeline()
# Keep running
while True:
schedule.run_pending()
time.sleep(1)
Webhook Alternative
If your API supports webhooks, you can receive real-time alerts instead of polling.
from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET')
def verify_signature(payload, signature):
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
@app.route('/webhook/threats', methods=['POST'])
def receive_threat():
# Verify webhook signature
signature = request.headers.get('X-Signature')
if not verify_signature(request.data, signature):
return jsonify({'error': 'Invalid signature'}), 401
threat = request.json
# Process threat immediately
if is_relevant(threat):
send_slack_alert(threat)
handle_credential_leak(threat)
return jsonify({'status': 'processed'}), 200
if __name__ == '__main__':
app.run(port=5000)
Ready to Build Your Pipeline?
Get your AdverseMonitor API key and start building automated threat detection today. Free trial includes full API access.
Start Free TrialNext Steps
- Add SIEM integration: Send events to Splunk, Sentinel, or Elastic
- Create tickets: Automatically open Jira or ServiceNow tickets
- Build dashboards: Store threats in a database for trend analysis
- Add enrichment: Cross-reference with VirusTotal or Shodan
- Deploy to cloud: Run on AWS Lambda, Azure Functions, or GCP
Conclusion
Automated threat detection with dark web APIs isn't complex—it's just connecting the pieces. The code in this tutorial gives you a working foundation. Customize the filters, add your notification channels, and connect your identity provider for automated response.
The difference between effective and ineffective threat intelligence is automation. When a credential leak hits the dark web at 3 AM, you want passwords resetting automatically—not waiting for someone to check a dashboard.