home / skills / copyleftdev / sk1llz / rodriguez
This skill helps you build threat hunting programs by applying Roberto Rodriguez's playbook with reproducible notebooks, data-driven tests, and open-source
npx playbooks add skill copyleftdev/sk1llz --skill rodriguezReview the files below or copy the command above to add this skill to your agents.
---
name: rodriguez-threat-hunter-playbook
description: Apply Roberto Rodriguez's threat hunting methodology with the Threat Hunter Playbook and HELK. Emphasizes documented hunts, open source infrastructure, and data-driven hunting. Use when building hunting programs or developing hunt playbooks.
---
# Roberto Rodriguez — Threat Hunter Playbook
## Overview
Roberto Rodriguez is a Principal Threat Researcher at Microsoft and creator of the Threat Hunter Playbook and HELK (Hunting ELK). His work democratized threat hunting by providing open-source infrastructure, documented methodologies, and reproducible hunt procedures.
## References
- **Threat Hunter Playbook**: https://threathunterplaybook.com/
- **HELK**: https://github.com/Cyb3rWard0g/HELK
- **GitHub**: https://github.com/Cyb3rWard0g
- **Open Threat Research**: https://github.com/OTRF
## Core Philosophy
> "Share knowledge, not just indicators."
> "If you can't reproduce it, you can't improve it."
> "The best defense is an educated community."
Rodriguez believes that threat hunting knowledge should be open, reproducible, and accessible. His playbooks document not just what to hunt, but how to think about hunting.
## Key Contributions
### Threat Hunter Playbook
Community-driven library of documented hunts mapped to ATT&CK, with queries, notebooks, and methodology.
### HELK (Hunting ELK)
Open source hunting platform combining Elasticsearch, Logstash, Kibana with Jupyter notebooks for interactive analysis.
### Mordor Datasets
Pre-recorded attack datasets for testing detections without needing a lab.
## When Implementing
### Always
- Document every hunt with methodology
- Map hunts to ATT&CK techniques
- Use Jupyter notebooks for reproducibility
- Share successful hunts with the team
- Test detections with Mordor datasets
- Build on community playbooks
### Never
- Hunt without a hypothesis
- Keep successful methodologies private
- Deploy without testing against known attacks
- Ignore the importance of data quality
- Skip documentation of findings
### Prefer
- Interactive notebooks over static queries
- Reproducible hunts over one-time searches
- Community playbooks as starting points
- Data-driven hypotheses over intuition
- Open source tools over vendor lock-in
## Implementation Patterns
### Hunt Playbook Structure
```python
# playbook.py
# Threat Hunter Playbook structure
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
@dataclass
class DataSource:
"""Required data for the hunt"""
name: str
category: str # e.g., "Process Monitoring"
platforms: List[str] # e.g., ["Windows"]
collection_method: str # e.g., "Sysmon Event 1"
fields_required: List[str]
@dataclass
class AnalyticStep:
"""Single step in hunt analytics"""
step_number: int
description: str
query: str
query_language: str # e.g., "KQL", "SPL", "SQL"
expected_output: str
interpretation: str
@dataclass
class HuntPlaybook:
"""Complete hunt playbook - Rodriguez style"""
# Metadata
id: str
title: str
description: str
author: str
created: datetime
modified: datetime
# ATT&CK Mapping
tactics: List[str]
techniques: List[str]
# Hunt Details
hypothesis: str
technical_context: str # Background on the technique
# Requirements
data_sources: List[DataSource]
platforms: List[str]
# Analytics
analytics: List[AnalyticStep]
# Validation
mordor_datasets: List[str] # Datasets for testing
# Results
expected_benign: List[str]
known_bypasses: List[str]
# References
references: List[str]
def to_jupyter_notebook(self) -> dict:
"""Export playbook as Jupyter notebook"""
cells = []
# Title cell
cells.append({
"cell_type": "markdown",
"source": [
f"# {self.title}\n",
f"\n",
f"**Author:** {self.author}\n",
f"**Created:** {self.created}\n",
f"\n",
f"## ATT&CK Mapping\n",
f"- **Tactics:** {', '.join(self.tactics)}\n",
f"- **Techniques:** {', '.join(self.techniques)}\n"
]
})
# Hypothesis cell
cells.append({
"cell_type": "markdown",
"source": [
"## Hypothesis\n",
f"\n{self.hypothesis}\n",
"\n## Technical Context\n",
f"\n{self.technical_context}\n"
]
})
# Analytics cells
for analytic in self.analytics:
# Description
cells.append({
"cell_type": "markdown",
"source": [
f"### Step {analytic.step_number}: {analytic.description}\n",
f"\n{analytic.interpretation}\n"
]
})
# Query
cells.append({
"cell_type": "code",
"source": [analytic.query]
})
return {
"cells": cells,
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "python",
"name": "pyspark"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
def validate_with_mordor(self) -> dict:
"""Instructions for validating with Mordor datasets"""
return {
"datasets": self.mordor_datasets,
"instructions": [
"1. Download Mordor dataset from https://mordordatasets.com/",
"2. Import into your SIEM/hunting platform",
f"3. Run analytics from this playbook",
"4. Verify detection of simulated attack",
"5. Document any required tuning"
]
}
# Example playbook: Credential Dumping via LSASS
lsass_playbook = HuntPlaybook(
id="WIN-190625024610",
title="Credential Dumping via LSASS Memory Access",
description="Detect credential dumping by monitoring processes "
"that access LSASS memory",
author="Roberto Rodriguez @Cyb3rWard0g",
created=datetime(2024, 1, 15),
modified=datetime(2024, 2, 20),
tactics=["Credential Access"],
techniques=["T1003.001"],
hypothesis="Adversaries might be accessing LSASS process memory "
"to extract credentials",
technical_context="""
The Local Security Authority Subsystem Service (LSASS) stores
credentials in memory for single sign-on. Attackers commonly
target LSASS using tools like Mimikatz, procdump, or comsvcs.dll.
Windows Event ID 10 (Sysmon) captures process access events,
including when a process reads another process's memory.
""",
data_sources=[
DataSource(
name="Process Access",
category="Process Monitoring",
platforms=["Windows"],
collection_method="Sysmon Event ID 10",
fields_required=[
"SourceProcessGUID",
"SourceImage",
"TargetImage",
"GrantedAccess"
]
)
],
platforms=["Windows"],
analytics=[
AnalyticStep(
step_number=1,
description="Find processes accessing LSASS",
query="""
SELECT
SourceImage,
TargetImage,
GrantedAccess,
COUNT(*) as AccessCount
FROM sysmon_events
WHERE EventCode = 10
AND TargetImage LIKE '%lsass.exe'
AND SourceImage NOT LIKE '%MsMpEng.exe'
AND SourceImage NOT LIKE '%csrss.exe'
GROUP BY SourceImage, TargetImage, GrantedAccess
ORDER BY AccessCount DESC
""",
query_language="SQL (Spark)",
expected_output="List of processes accessing LSASS with counts",
interpretation="Look for unusual processes or high access counts"
),
AnalyticStep(
step_number=2,
description="Analyze granted access rights",
query="""
SELECT
SourceImage,
GrantedAccess,
CASE
WHEN GrantedAccess IN ('0x1010', '0x1410', '0x1438', '0x143a')
THEN 'SUSPICIOUS - Memory Read Access'
ELSE 'Likely Benign'
END as Assessment
FROM sysmon_events
WHERE EventCode = 10
AND TargetImage LIKE '%lsass.exe'
""",
query_language="SQL (Spark)",
expected_output="Access rights analysis",
interpretation="Memory read access (0x1010, 0x1410) indicates "
"potential credential dumping"
)
],
mordor_datasets=[
"https://mordordatasets.com/notebooks/small/windows/06_credential_access/"
],
expected_benign=[
"Windows Defender (MsMpEng.exe)",
"Client Server Runtime (csrss.exe)",
"System process",
"Antivirus products"
],
known_bypasses=[
"Direct syscalls (bypass Sysmon hooking)",
"Targeting SAM/SECURITY registry instead",
"Using MiniDumpWriteDump variations"
],
references=[
"https://attack.mitre.org/techniques/T1003/001/",
"https://github.com/gentilkiwi/mimikatz",
"https://threathunterplaybook.com/notebooks/windows/06_credential_access/"
]
)
```
### HELK-Style Hunting Platform
```python
# helk_hunt.py
# Hunting with HELK-style infrastructure
from dataclasses import dataclass
from typing import List, Dict, Any
import pandas as pd
@dataclass
class HuntingPlatform:
"""HELK-inspired hunting infrastructure"""
elasticsearch_url: str
spark_master: str
jupyter_url: str
def query_elastic(self, query: Dict) -> pd.DataFrame:
"""Query Elasticsearch and return DataFrame"""
from elasticsearch import Elasticsearch
es = Elasticsearch([self.elasticsearch_url])
response = es.search(
index="logs-*",
body=query,
size=10000
)
hits = response['hits']['hits']
return pd.DataFrame([hit['_source'] for hit in hits])
def hunt_with_spark(self, sql_query: str) -> pd.DataFrame:
"""Execute Spark SQL for large-scale hunting"""
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master(self.spark_master) \
.appName("ThreatHunting") \
.getOrCreate()
result = spark.sql(sql_query)
return result.toPandas()
class InteractiveHunt:
"""Jupyter notebook-based interactive hunt"""
def __init__(self, platform: HuntingPlatform):
self.platform = platform
self.findings = []
self.timeline = []
def search(self, query: str, time_range: str = "24h") -> pd.DataFrame:
"""Execute search and track in timeline"""
self.timeline.append({
'action': 'search',
'query': query,
'time_range': time_range
})
# Execute query
elastic_query = {
"query": {
"bool": {
"must": [
{"query_string": {"query": query}},
{"range": {"@timestamp": {"gte": f"now-{time_range}"}}}
]
}
}
}
return self.platform.query_elastic(elastic_query)
def filter_noise(self, df: pd.DataFrame,
column: str,
exclude: List[str]) -> pd.DataFrame:
"""Filter known benign activity"""
self.timeline.append({
'action': 'filter',
'column': column,
'excluded': exclude
})
mask = ~df[column].isin(exclude)
return df[mask]
def pivot(self, df: pd.DataFrame,
pivot_field: str,
pivot_value: Any) -> pd.DataFrame:
"""Pivot investigation to related events"""
self.timeline.append({
'action': 'pivot',
'field': pivot_field,
'value': pivot_value
})
query = f'{pivot_field}:"{pivot_value}"'
return self.search(query, time_range="7d")
def mark_finding(self, description: str,
evidence: pd.DataFrame,
severity: str):
"""Document a finding"""
self.findings.append({
'description': description,
'evidence_count': len(evidence),
'severity': severity,
'evidence_sample': evidence.head(5).to_dict()
})
def generate_report(self) -> str:
"""Generate hunt report"""
report = "# Hunt Report\n\n"
report += "## Investigation Timeline\n\n"
for i, step in enumerate(self.timeline, 1):
report += f"{i}. **{step['action'].title()}**: "
if step['action'] == 'search':
report += f"Query: `{step['query']}`\n"
elif step['action'] == 'filter':
report += f"Excluded {len(step['excluded'])} values from {step['column']}\n"
elif step['action'] == 'pivot':
report += f"Pivoted on {step['field']}={step['value']}\n"
report += "\n## Findings\n\n"
for i, finding in enumerate(self.findings, 1):
report += f"### Finding {i}: {finding['description']}\n"
report += f"- **Severity**: {finding['severity']}\n"
report += f"- **Evidence Count**: {finding['evidence_count']}\n\n"
return report
```
### Mordor Dataset Integration
```python
# mordor_testing.py
# Test detections with Mordor attack datasets
from dataclasses import dataclass
from typing import List, Dict, Optional
import requests
import json
@dataclass
class MordorDataset:
"""Mordor attack dataset reference"""
id: str
name: str
description: str
attack_technique: str
platform: str
download_url: str
file_format: str # "json", "csv"
# Expected indicators
expected_processes: List[str]
expected_files: List[str]
expected_network: List[str]
def download(self, output_path: str):
"""Download dataset"""
response = requests.get(self.download_url)
response.raise_for_status()
with open(output_path, 'wb') as f:
f.write(response.content)
return output_path
def load_events(self) -> List[Dict]:
"""Load events from dataset"""
response = requests.get(self.download_url)
response.raise_for_status()
if self.file_format == 'json':
return response.json()
else:
# Handle other formats
pass
class DetectionValidator:
"""Validate detections against Mordor datasets"""
def __init__(self):
self.results = []
def test_detection(self,
detection_query: str,
dataset: MordorDataset,
query_executor) -> dict:
"""Test if detection finds attack in Mordor data"""
# Load Mordor dataset into test environment
events = dataset.load_events()
# Index events
for event in events:
query_executor.index(event)
# Run detection
matches = query_executor.search(detection_query)
# Assess results
result = {
'dataset': dataset.id,
'technique': dataset.attack_technique,
'total_events': len(events),
'matches': len(matches),
'detected': len(matches) > 0,
'expected_processes_found': self._check_indicators(
matches, 'process.name', dataset.expected_processes
)
}
self.results.append(result)
return result
def _check_indicators(self,
matches: List[Dict],
field: str,
expected: List[str]) -> List[str]:
"""Check which expected indicators were found"""
found = []
for match in matches:
value = match.get(field)
if value in expected:
found.append(value)
return list(set(found))
def coverage_report(self) -> str:
"""Generate detection coverage report"""
report = "# Detection Validation Report\n\n"
detected = sum(1 for r in self.results if r['detected'])
total = len(self.results)
report += f"**Overall Detection Rate**: {detected}/{total} "
report += f"({detected/total*100:.1f}%)\n\n"
report += "## Results by Technique\n\n"
for result in self.results:
status = "✅" if result['detected'] else "❌"
report += f"- {status} **{result['technique']}**: "
report += f"{result['matches']} matches in {result['total_events']} events\n"
return report
```
## Mental Model
Rodriguez approaches threat hunting by asking:
1. **Is this documented?** Can others reproduce this hunt?
2. **Is this mapped?** What ATT&CK technique does this address?
3. **Can I test this?** Do I have Mordor data to validate?
4. **Is this shareable?** Can the community benefit?
5. **Is this interactive?** Can I explore and pivot?
## Signature Rodriguez Moves
- Jupyter notebooks for reproducible hunts
- Mordor datasets for validation
- ATT&CK mapping for all playbooks
- Open source hunting infrastructure (HELK)
- Community playbook contribution
- Data-driven hypothesis generation
This skill applies Roberto Rodriguez's threat hunting methodology using the Threat Hunter Playbook and HELK patterns. It emphasizes documented, reproducible hunts, open-source infrastructure, and data-driven hypotheses to build scalable hunting programs. Use it to create playbooks, run interactive notebook hunts, and validate analytics with Mordor datasets.
The skill encodes a playbook structure that captures metadata, ATT&CK mapping, hypothesis, required data sources, analytic steps, and validation artifacts. It provides HELK-style primitives for querying Elasticsearch, executing Spark SQL, running interactive Jupyter hunts, filtering noise, pivoting investigations, and exporting reproducible notebooks and reports. Validation routines outline steps to test detections using curated Mordor datasets.
Do I need HELK to use this methodology?
No. HELK is an opinionated open-source stack used for interactive hunting, but the methodology and playbook structure can be applied to any SIEM or analytics platform that supports queries and notebooks.
How do I validate a playbook before deployment?
Use curated attack datasets like Mordor to run the analytic steps, verify expected outputs, tune thresholds, and document any false positives before enabling production alerts.