← Back to Blog
February 1, 2025 • 14 min read

Building Automated Threat Detection with Dark Web Monitoring APIs

A hands-on developer guide to building threat detection pipelines with Python and JavaScript
< 100 Lines
Code needed to build a fully functional dark web monitoring pipeline with automated alerting

Dark web monitoring APIs provide the raw threat intelligence. Your job as a developer is to transform that data into automated detection and response. This tutorial walks you through building a complete threat detection pipeline from scratch.

By the end, you'll have working code that polls for threats, filters relevant alerts, and triggers automated responses—all in under 100 lines.

Prerequisites

Step 1: API Authentication

First, let's set up secure API authentication. Never hardcode API keys—use environment variables.

Python
import os
import requests
from datetime import datetime, timedelta

# Load API key from environment
API_KEY = os.environ.get('DARKWEB_API_KEY')
API_BASE = 'https://api.adversemonitor.com/v1'

def get_headers():
    return {
        'Authorization': f'Bearer {API_KEY}',
        'Content-Type': 'application/json'
    }

# Test connection
def test_connection():
    response = requests.get(
        f'{API_BASE}/status',
        headers=get_headers()
    )
    return response.status_code == 200
JavaScript
const axios = require('axios');

const API_KEY = process.env.DARKWEB_API_KEY;
const API_BASE = 'https://api.adversemonitor.com/v1';

const apiClient = axios.create({
    baseURL: API_BASE,
    headers: {
        'Authorization': `Bearer ${API_KEY}`,
        'Content-Type': 'application/json'
    }
});

async function testConnection() {
    const response = await apiClient.get('/status');
    return response.status === 200;
}

Step 2: Building the Poller

The poller fetches new threats since the last check. We'll use a timestamp to track what we've already processed.

Python
import json
from pathlib import Path

STATE_FILE = Path('last_poll.json')

def get_last_poll_time():
    if STATE_FILE.exists():
        data = json.loads(STATE_FILE.read_text())
        return data.get('last_poll')
    # Default: 1 hour ago
    return (datetime.utcnow() - timedelta(hours=1)).isoformat()

def save_poll_time(timestamp):
    STATE_FILE.write_text(json.dumps({'last_poll': timestamp}))

def fetch_new_threats():
    since = get_last_poll_time()

    response = requests.get(
        f'{API_BASE}/threats',
        headers=get_headers(),
        params={'since': since, 'limit': 100}
    )

    if response.status_code == 200:
        threats = response.json().get('threats', [])

        if threats:
            # Save timestamp of newest threat
            newest = max(t['detected_at'] for t in threats)
            save_poll_time(newest)

        return threats

    return []

Step 3: Filtering Relevant Threats

Not every threat requires action. Filter based on your organization's domains and risk tolerance.

Python
# Configuration
MONITORED_DOMAINS = ['company.com', 'company.co.uk']
MIN_SEVERITY = 'medium'  # low, medium, high, critical

SEVERITY_LEVELS = {
    'low': 1,
    'medium': 2,
    'high': 3,
    'critical': 4
}

def is_relevant(threat):
    # Check domain match
    affected_domains = threat.get('affected_domains', [])
    domain_match = any(
        any(d in affected for d in MONITORED_DOMAINS)
        for affected in affected_domains
    )

    # Check severity threshold
    severity = threat.get('severity', 'low')
    meets_severity = SEVERITY_LEVELS.get(severity, 0) >= SEVERITY_LEVELS[MIN_SEVERITY]

    return domain_match and meets_severity

def filter_threats(threats):
    return [t for t in threats if is_relevant(t)]

Step 4: Sending Notifications

When relevant threats are detected, notify your team immediately.

Python - Slack
SLACK_WEBHOOK = os.environ.get('SLACK_WEBHOOK_URL')

def send_slack_alert(threat):
    severity_emoji = {
        'critical': '🚨',
        'high': '⚠️',
        'medium': '📢',
        'low': 'ℹ️'
    }

    emoji = severity_emoji.get(threat['severity'], '📢')

    message = {
        'blocks': [
            {
                'type': 'header',
                'text': {
                    'type': 'plain_text',
                    'text': f'{emoji} Dark Web Alert: {threat["type"]}'
                }
            },
            {
                'type': 'section',
                'fields': [
                    {'type': 'mrkdwn', 'text': f'*Severity:* {threat["severity"]}'},
                    {'type': 'mrkdwn', 'text': f'*Detected:* {threat["detected_at"]}'},
                    {'type': 'mrkdwn', 'text': f'*Source:* {threat["source"]}'}
                ]
            },
            {
                'type': 'section',
                'text': {
                    'type': 'mrkdwn',
                    'text': threat.get('summary', 'No summary available')
                }
            }
        ]
    }

    requests.post(SLACK_WEBHOOK, json=message)

Step 5: Automated Response

The real power is automated response. Here's an example that forces password resets for exposed credentials.

Python - Okta Integration
OKTA_DOMAIN = os.environ.get('OKTA_DOMAIN')
OKTA_TOKEN = os.environ.get('OKTA_API_TOKEN')

def force_password_reset(email):
    # Find user in Okta
    response = requests.get(
        f'https://{OKTA_DOMAIN}/api/v1/users',
        params={'search': f'profile.email eq "{email}"'},
        headers={'Authorization': f'SSWS {OKTA_TOKEN}'}
    )

    users = response.json()
    if not users:
        return False

    user_id = users[0]['id']

    # Expire password
    requests.post(
        f'https://{OKTA_DOMAIN}/api/v1/users/{user_id}/lifecycle/expire_password',
        headers={'Authorization': f'SSWS {OKTA_TOKEN}'}
    )

    return True

def handle_credential_leak(threat):
    if threat['type'] != 'credential_leak':
        return

    exposed_emails = threat.get('exposed_credentials', [])

    for cred in exposed_emails:
        email = cred.get('email')
        if email and any(d in email for d in MONITORED_DOMAINS):
            success = force_password_reset(email)
            if success:
                print(f'Password reset forced for: {email}')

Step 6: Complete Pipeline

Here's the complete pipeline that runs on a schedule:

Python - Full Pipeline
import time
import schedule

def run_pipeline():
    print(f'[{datetime.utcnow()}] Running threat detection pipeline...')

    # 1. Fetch new threats
    threats = fetch_new_threats()
    print(f'  Found {len(threats)} new threats')

    # 2. Filter relevant threats
    relevant = filter_threats(threats)
    print(f'  {len(relevant)} threats match our criteria')

    # 3. Process each threat
    for threat in relevant:
        # Send notification
        send_slack_alert(threat)

        # Handle credential leaks specially
        if threat['type'] == 'credential_leak':
            handle_credential_leak(threat)

    print('  Pipeline complete')

# Run every 5 minutes
schedule.every(5).minutes.do(run_pipeline)

# Initial run
run_pipeline()

# Keep running
while True:
    schedule.run_pending()
    time.sleep(1)

Webhook Alternative

If your API supports webhooks, you can receive real-time alerts instead of polling.

Python - Flask Webhook
from flask import Flask, request, jsonify
import hmac
import hashlib

app = Flask(__name__)
WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET')

def verify_signature(payload, signature):
    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(signature, expected)

@app.route('/webhook/threats', methods=['POST'])
def receive_threat():
    # Verify webhook signature
    signature = request.headers.get('X-Signature')
    if not verify_signature(request.data, signature):
        return jsonify({'error': 'Invalid signature'}), 401

    threat = request.json

    # Process threat immediately
    if is_relevant(threat):
        send_slack_alert(threat)
        handle_credential_leak(threat)

    return jsonify({'status': 'processed'}), 200

if __name__ == '__main__':
    app.run(port=5000)

Ready to Build Your Pipeline?

Get your AdverseMonitor API key and start building automated threat detection today. Free trial includes full API access.

Start Free Trial

Next Steps

Conclusion

Automated threat detection with dark web APIs isn't complex—it's just connecting the pieces. The code in this tutorial gives you a working foundation. Customize the filters, add your notification channels, and connect your identity provider for automated response.

The difference between effective and ineffective threat intelligence is automation. When a credential leak hits the dark web at 3 AM, you want passwords resetting automatically—not waiting for someone to check a dashboard.

Related Articles