Об'єднуємо всі навички з курсу в повноцінну систему аналізу. Це не навчальний приклад — це робочий інструмент, який можна адаптувати під реальні задачі.
Архітектура системи
osint_system/
├── config.py # Конфігурація
├── main.py # Точка входу
├── database.py # Робота з SQLite
├── collectors/
│ ├── __init__.py
│ ├── file_collector.py # Імпорт з файлів
│ └── web_collector.py # Збір з веб
├── analyzers/
│ ├── __init__.py
│ ├── text_analyzer.py # Аналіз тексту
│ ├── geo_analyzer.py # Геоаналіз
│ └── time_analyzer.py # Часові патерни
├── reports/
│ ├── __init__.py
│ └── html_report.py # Генерація звітів
├── data/ # Вхідні дані
├── output/ # Результати
└── logs/ # Логи
"""Модуль роботи з базою даних"""
import sqlite3
import pandas as pd
from datetime import datetime
from config import DB_PATH
import logging
logger = logging.getLogger(__name__)
class Database:
def __init__(self, db_path=DB_PATH):
self.db_path = db_path
self.conn = None
self._connect()
self._init_schema()
def _connect(self):
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
logger.info(f"Підключено до {self.db_path}")
def _init_schema(self):
cursor = self.conn.cursor()
# Таблиця подій
cursor.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
event_type TEXT,
category TEXT,
sector TEXT,
lat REAL,
lon REAL,
source TEXT,
raw_text TEXT,
callsigns TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Таблиця сутностей
cursor.execute('''
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
entity_type TEXT,
first_seen TEXT,
last_seen TEXT,
event_count INTEGER DEFAULT 0,
metadata TEXT
)
''')
# Індекси
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ev_ts ON events(timestamp)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ev_cat ON events(category)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ev_sector ON events(sector)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ent_name ON entities(name)')
self.conn.commit()
def add_event(self, **kwargs):
cursor = self.conn.cursor()
columns = ', '.join(kwargs.keys())
placeholders = ', '.join(['?' for _ in kwargs])
cursor.execute(
f'INSERT INTO events ({columns}) VALUES ({placeholders})',
list(kwargs.values())
)
self.conn.commit()
return cursor.lastrowid
def add_entity(self, name, entity_type, timestamp):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO entities (name, entity_type, first_seen, last_seen, event_count)
VALUES (?, ?, ?, ?, 1)
ON CONFLICT(name) DO UPDATE SET
last_seen = ?,
event_count = event_count + 1
''', (name, entity_type, timestamp, timestamp, timestamp))
self.conn.commit()
def query(self, sql, params=None):
return pd.read_sql_query(sql, self.conn, params=params or [])
def get_events(self, **filters):
sql = 'SELECT * FROM events WHERE 1=1'
params = []
for key, value in filters.items():
if value:
sql += f' AND {key} = ?'
params.append(value)
sql += ' ORDER BY timestamp DESC'
return self.query(sql, params)
def get_stats(self):
stats = {}
cursor = self.conn.cursor()
cursor.execute('SELECT COUNT(*) FROM events')
stats['total_events'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM entities')
stats['total_entities'] = cursor.fetchone()[0]
stats['by_category'] = dict(self.query(
'SELECT category, COUNT(*) as cnt FROM events GROUP BY category'
).values)
stats['by_sector'] = dict(self.query(
'SELECT sector, COUNT(*) as cnt FROM events WHERE sector IS NOT NULL GROUP BY sector'
).values)
return stats
def close(self):
if self.conn:
self.conn.close()
logger.info("З'єднання закрито")
Файл 3: analyzers/text_analyzer.py
"""Аналіз тексту повідомлень"""
import re
from collections import Counter
from config import EVENT_CATEGORIES, STOP_WORDS
import logging
logger = logging.getLogger(__name__)
class TextAnalyzer:
def __init__(self):
self.categories = EVENT_CATEGORIES
self.stop_words = STOP_WORDS
def categorize(self, text):
"""Визначає категорію повідомлення"""
text_lower = text.lower()
scores = {}
for category, keywords in self.categories.items():
score = sum(1 for kw in keywords if kw in text_lower)
if score > 0:
scores[category] = score
if scores:
return max(scores, key=scores.get)
return 'other'
def extract_callsigns(self, text):
"""Витягує позивні"""
# Патерн: Слово-Число або Слово з великої
patterns = [
r'\b([А-ЯІЇЄҐ][а-яіїєґ]+)-?(\d+)\b', # Сокіл-1
r'\b([A-Z][a-z]+)-?(\d+)\b', # Alpha-2
]
callsigns = []
for pattern in patterns:
matches = re.findall(pattern, text)
for match in matches:
callsign = f"{match[0]}-{match[1]}" if match[1] else match[0]
callsigns.append(callsign)
return list(set(callsigns))
def extract_coordinates(self, text):
"""Витягує координати"""
# Десяткові
decimal = re.findall(r'(\d+\.\d+)[,\s]\s*(\d+\.\d+)', text)
coords = []
for lat, lon in decimal:
lat_f, lon_f = float(lat), float(lon)
if 44 < lat_f < 53 and 22 < lon_f < 41: # Україна
coords.append((lat_f, lon_f))
return coords
def extract_time(self, text):
"""Витягує час"""
patterns = [
r'\[(\d{2}:\d{2})\]', # [08:23]
r'о\s+(\d{2}:\d{2})', # о 08:23
r'(\d{2}:\d{2}:\d{2})', # 08:23:00
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1)
return None
def get_word_freq(self, texts):
"""Частота слів у списку текстів"""
all_words = []
for text in texts:
# Очищення
clean = re.sub(r'[^\w\s]', '', text.lower())
words = [w for w in clean.split() if w not in self.stop_words and len(w) > 2]
all_words.extend(words)
return Counter(all_words).most_common(20)
def analyze_message(self, text, timestamp=None):
"""Повний аналіз повідомлення"""
return {
'category': self.categorize(text),
'callsigns': self.extract_callsigns(text),
'coordinates': self.extract_coordinates(text),
'time': self.extract_time(text) or timestamp,
'raw_text': text
}
Файл 4: analyzers/geo_analyzer.py
"""Геопросторовий аналіз"""
from geopy.distance import geodesic
import folium
from config import OUTPUT_DIR
import logging
logger = logging.getLogger(__name__)
class GeoAnalyzer:
def __init__(self):
self.cluster_distance = 2 # км
def calculate_distance(self, coord1, coord2):
"""Відстань між точками в км"""
return geodesic(coord1, coord2).kilometers
def find_nearby(self, center, points, radius_km=5):
"""Точки в радіусі"""
nearby = []
for p in points:
coord = (p['lat'], p['lon'])
dist = self.calculate_distance(center, coord)
if dist <= radius_km:
p['distance'] = round(dist, 2)
nearby.append(p)
return sorted(nearby, key=lambda x: x['distance'])
def cluster_points(self, points):
"""Проста кластеризація"""
if not points:
return []
clusters = []
used = set()
for i, p1 in enumerate(points):
if i in used:
continue
cluster = [p1]
used.add(i)
for j, p2 in enumerate(points):
if j in used:
continue
dist = self.calculate_distance(
(p1['lat'], p1['lon']),
(p2['lat'], p2['lon'])
)
if dist <= self.cluster_distance:
cluster.append(p2)
used.add(j)
clusters.append(cluster)
return clusters
def generate_map(self, events_df, filename='map.html'):
"""Генерує інтерактивну карту"""
# Фільтруємо точки з координатами
geo_df = events_df.dropna(subset=['lat', 'lon'])
if len(geo_df) == 0:
logger.warning("Немає даних з координатами")
return None
# Центр карти
center = [geo_df['lat'].mean(), geo_df['lon'].mean()]
m = folium.Map(location=center, zoom_start=10)
# Кольори категорій
colors = {
'movement': 'blue',
'fire': 'red',
'contact': 'orange',
'comms': 'green',
'logistics': 'purple',
'other': 'gray'
}
for _, row in geo_df.iterrows():
color = colors.get(row.get('category', 'other'), 'gray')
popup_text = f"""
<b>{row.get('timestamp', 'N/A')}</b><br>
Тип: {row.get('category', 'N/A')}<br>
Сектор: {row.get('sector', 'N/A')}<br>
"""
folium.CircleMarker(
location=[row['lat'], row['lon']],
radius=8,
color=color,
fill=True,
popup=popup_text
).add_to(m)
output_path = OUTPUT_DIR / filename
m.save(str(output_path))
logger.info(f"Карту збережено: {output_path}")
return output_path
"""Головний скрипт системи"""
import logging
from datetime import datetime
from pathlib import Path
from config import LOG_DIR, LOG_FORMAT, LOG_LEVEL, DATA_DIR
from database import Database
from analyzers.text_analyzer import TextAnalyzer
from analyzers.geo_analyzer import GeoAnalyzer
from reports.html_report import ReportGenerator
# Налаштування логування
logging.basicConfig(
level=LOG_LEVEL,
format=LOG_FORMAT,
handlers=[
logging.FileHandler(LOG_DIR / f"osint_{datetime.now():%Y%m%d}.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger('main')
def process_messages(messages, db, text_analyzer):
"""Обробляє список повідомлень"""
processed = 0
for msg in messages:
# Аналіз тексту
analysis = text_analyzer.analyze_message(msg['text'], msg.get('timestamp'))
# Координати (якщо є)
lat, lon = None, None
if analysis['coordinates']:
lat, lon = analysis['coordinates'][0]
# Зберігаємо подію
db.add_event(
timestamp=analysis['time'],
event_type=msg.get('type', 'message'),
category=analysis['category'],
sector=msg.get('sector'),
lat=lat,
lon=lon,
source=msg.get('source', 'unknown'),
raw_text=analysis['raw_text'],
callsigns=','.join(analysis['callsigns'])
)
# Зберігаємо сутності (позивні)
for callsign in analysis['callsigns']:
db.add_entity(callsign, 'callsign', analysis['time'])
processed += 1
return processed
def load_test_data():
"""Завантажує тестові дані"""
return [
{'timestamp': '2025-01-15 08:23', 'text': '[08:23] Сокіл-1: Група на позиції 48.4567, 35.0234, зв\'язок встановлено', 'sector': 'A'},
{'timestamp': '2025-01-15 08:45', 'text': '[08:45] Орел-2: Переміщення до точки Альфа, координати 48.5123, 35.1456', 'sector': 'B'},
{'timestamp': '2025-01-15 09:12', 'text': '[09:12] Сокіл-1: Вогневий контакт на позиції, потрібна підтримка', 'sector': 'A'},
{'timestamp': '2025-01-15 09:30', 'text': '[09:30] Грім-3: Виявлено рух противника, координати 49.1234, 36.5678', 'sector': 'C'},
{'timestamp': '2025-01-15 10:15', 'text': '[10:15] Орел-2: Прибув на точку, зв\'язок стабільний', 'sector': 'B'},
{'timestamp': '2025-01-15 10:45', 'text': '[10:45] Сокіл-1: Контакт завершено, запит на евакуацію', 'sector': 'A'},
{'timestamp': '2025-01-15 11:00', 'text': '[11:00] Беркут-1: Спостереження з позиції 48.4800, 35.0500', 'sector': 'A'},
{'timestamp': '2025-01-15 11:30', 'text': '[11:30] Грім-3: Артилерія по квадрату, очікуємо результат', 'sector': 'C'},
]
def main():
logger.info("=" * 60)
logger.info("ЗАПУСК OSINT СИСТЕМИ")
logger.info("=" * 60)
# Ініціалізація
db = Database()
text_analyzer = TextAnalyzer()
geo_analyzer = GeoAnalyzer()
report_gen = ReportGenerator()
try:
# 1. Завантаження даних
logger.info("Крок 1: Завантаження даних")
messages = load_test_data()
logger.info(f" Завантажено: {len(messages)} повідомлень")
# 2. Обробка
logger.info("Крок 2: Обробка повідомлень")
processed = process_messages(messages, db, text_analyzer)
logger.info(f" Оброблено: {processed}")
# 3. Аналіз
logger.info("Крок 3: Аналіз")
stats = db.get_stats()
logger.info(f" Подій у базі: {stats['total_events']}")
logger.info(f" Сутностей: {stats['total_entities']}")
# Частота слів
events_df = db.query('SELECT * FROM events')
word_freq = text_analyzer.get_word_freq(events_df['raw_text'].tolist())
# 4. Карта
logger.info("Крок 4: Генерація карти")
geo_analyzer.generate_map(events_df)
# 5. Звіт
logger.info("Крок 5: Генерація звіту")
report_path = report_gen.generate(stats, word_freq, events_df)
logger.info("=" * 60)
logger.info("ЗАВЕРШЕНО УСПІШНО")
logger.info(f"Звіт: {report_path}")
logger.info("=" * 60)
except Exception as e:
logger.error(f"ПОМИЛКА: {e}", exc_info=True)
raise
finally:
db.close()
if __name__ == '__main__':
main()
Запуск системи
# Встановлення залежностей
pip install pandas geopy folium
# Створення структури
mkdir -p osint_system/{collectors,analyzers,reports,data,output,logs}
touch osint_system/collectors/__init__.py
touch osint_system/analyzers/__init__.py
touch osint_system/reports/__init__.py
# Запуск
cd osint_system
python main.py
Очікуваний результат
2025-01-15 12:00:00 | INFO | main | ============================================================
2025-01-15 12:00:00 | INFO | main | ЗАПУСК OSINT СИСТЕМИ
2025-01-15 12:00:00 | INFO | main | ============================================================
2025-01-15 12:00:00 | INFO | database | Підключено до osint.db
2025-01-15 12:00:00 | INFO | main | Крок 1: Завантаження даних
2025-01-15 12:00:00 | INFO | main | Завантажено: 8 повідомлень
2025-01-15 12:00:00 | INFO | main | Крок 2: Обробка повідомлень
2025-01-15 12:00:00 | INFO | main | Оброблено: 8
2025-01-15 12:00:00 | INFO | main | Крок 3: Аналіз
2025-01-15 12:00:00 | INFO | main | Подій у базі: 8
2025-01-15 12:00:00 | INFO | main | Сутностей: 4
2025-01-15 12:00:01 | INFO | geo_analyzer | Карту збережено: output/map.html
2025-01-15 12:00:01 | INFO | html_report | Звіт збережено: output/report_20250115_1200.html
2025-01-15 12:00:01 | INFO | main | ============================================================
2025-01-15 12:00:01 | INFO | main | ЗАВЕРШЕНО УСПІШНО
2025-01-15 12:00:01 | INFO | main | ============================================================
Розширення системи
Система модульна — легко додавати нові можливості:
Telegram-бот — отримання даних та сповіщень
Web-інтерфейс — Flask/FastAPI для браузерного доступу
API інтеграції — підключення зовнішніх джерел
ML-класифікація — автоматична категоризація
Сповіщення — алерти при виявленні патернів
Підсумок курсу
За 10 уроків ви освоїли:
✓ Вітаємо! Ви маєте всі необхідні навички для створення власних інструментів аналізу. Головне — практика. Адаптуйте систему під свої задачі, експериментуйте, питайте AI.
Фінальний чек-лист
☐
Структура проєкту створена
☐
Усі модулі на місці
☐
main.py запускається
☐
База даних наповнюється
☐
Карта генерується
☐
HTML-звіт створюється
☐
Логи записуються