Модуль 10: Фінальний проєкт: OSINT-система

УРОК 10

Час2-3 години
ПотрібноУроки 1-9
РезультатПовноцінна OSINT-система

Мета

Об'єднуємо всі навички з курсу в повноцінну систему аналізу. Це не навчальний приклад — це робочий інструмент, який можна адаптувати під реальні задачі.

Архітектура системи

osint_system/ ├── config.py # Конфігурація ├── main.py # Точка входу ├── database.py # Робота з SQLite ├── collectors/ │ ├── __init__.py │ ├── file_collector.py # Імпорт з файлів │ └── web_collector.py # Збір з веб ├── analyzers/ │ ├── __init__.py │ ├── text_analyzer.py # Аналіз тексту │ ├── geo_analyzer.py # Геоаналіз │ └── time_analyzer.py # Часові патерни ├── reports/ │ ├── __init__.py │ └── html_report.py # Генерація звітів ├── data/ # Вхідні дані ├── output/ # Результати └── logs/ # Логи

Файл 1: config.py

"""Конфігурація системи""" from pathlib import Path import logging # Шляхи BASE_DIR = Path(__file__).parent DATA_DIR = BASE_DIR / 'data' OUTPUT_DIR = BASE_DIR / 'output' LOG_DIR = BASE_DIR / 'logs' DB_PATH = BASE_DIR / 'osint.db' # Створюємо папки for d in [DATA_DIR, OUTPUT_DIR, LOG_DIR]: d.mkdir(exist_ok=True) # Налаштування логування LOG_FORMAT = '%(asctime)s | %(levelname)s | %(name)s | %(message)s' LOG_LEVEL = logging.INFO # Категорії подій EVENT_CATEGORIES = { 'movement': ['переміщення', 'рух', 'прибув', 'вийшов', 'марш'], 'fire': ['вогневий', 'обстріл', 'артилерія', 'удар', 'пострілів'], 'contact': ['контакт', 'виявлено', 'спостереження', 'противник'], 'comms': ['зв\'язок', 'радіо', 'сигнал', 'частота', 'позивний'], 'logistics': ['постачання', 'паливо', 'боєприпаси', 'евакуація'] } # Стоп-слова (українські) STOP_WORDS = { 'і', 'в', 'на', 'що', 'це', 'до', 'з', 'не', 'та', 'як', 'від', 'за', 'по', 'для', 'але', 'або', 'ще', 'вже', 'так', 'був', 'буде', 'є', 'їх', 'його', 'її', 'ми', 'ви', 'вони' }

Файл 2: database.py

"""Модуль роботи з базою даних""" import sqlite3 import pandas as pd from datetime import datetime from config import DB_PATH import logging logger = logging.getLogger(__name__) class Database: def __init__(self, db_path=DB_PATH): self.db_path = db_path self.conn = None self._connect() self._init_schema() def _connect(self): self.conn = sqlite3.connect(self.db_path) self.conn.row_factory = sqlite3.Row logger.info(f"Підключено до {self.db_path}") def _init_schema(self): cursor = self.conn.cursor() # Таблиця подій cursor.execute(''' CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, event_type TEXT, category TEXT, sector TEXT, lat REAL, lon REAL, source TEXT, raw_text TEXT, callsigns TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') # Таблиця сутностей cursor.execute(''' CREATE TABLE IF NOT EXISTS entities ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE, entity_type TEXT, first_seen TEXT, last_seen TEXT, event_count INTEGER DEFAULT 0, metadata TEXT ) ''') # Індекси cursor.execute('CREATE INDEX IF NOT EXISTS idx_ev_ts ON events(timestamp)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_ev_cat ON events(category)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_ev_sector ON events(sector)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_ent_name ON entities(name)') self.conn.commit() def add_event(self, **kwargs): cursor = self.conn.cursor() columns = ', '.join(kwargs.keys()) placeholders = ', '.join(['?' for _ in kwargs]) cursor.execute( f'INSERT INTO events ({columns}) VALUES ({placeholders})', list(kwargs.values()) ) self.conn.commit() return cursor.lastrowid def add_entity(self, name, entity_type, timestamp): cursor = self.conn.cursor() cursor.execute(''' INSERT INTO entities (name, entity_type, first_seen, last_seen, event_count) VALUES (?, ?, ?, ?, 1) ON CONFLICT(name) DO UPDATE SET last_seen = ?, event_count = event_count + 1 ''', (name, entity_type, timestamp, timestamp, timestamp)) self.conn.commit() def query(self, sql, params=None): return pd.read_sql_query(sql, self.conn, params=params or []) def get_events(self, **filters): sql = 'SELECT * FROM events WHERE 1=1' params = [] for key, value in filters.items(): if value: sql += f' AND {key} = ?' params.append(value) sql += ' ORDER BY timestamp DESC' return self.query(sql, params) def get_stats(self): stats = {} cursor = self.conn.cursor() cursor.execute('SELECT COUNT(*) FROM events') stats['total_events'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(*) FROM entities') stats['total_entities'] = cursor.fetchone()[0] stats['by_category'] = dict(self.query( 'SELECT category, COUNT(*) as cnt FROM events GROUP BY category' ).values) stats['by_sector'] = dict(self.query( 'SELECT sector, COUNT(*) as cnt FROM events WHERE sector IS NOT NULL GROUP BY sector' ).values) return stats def close(self): if self.conn: self.conn.close() logger.info("З'єднання закрито")

Файл 3: analyzers/text_analyzer.py

"""Аналіз тексту повідомлень""" import re from collections import Counter from config import EVENT_CATEGORIES, STOP_WORDS import logging logger = logging.getLogger(__name__) class TextAnalyzer: def __init__(self): self.categories = EVENT_CATEGORIES self.stop_words = STOP_WORDS def categorize(self, text): """Визначає категорію повідомлення""" text_lower = text.lower() scores = {} for category, keywords in self.categories.items(): score = sum(1 for kw in keywords if kw in text_lower) if score > 0: scores[category] = score if scores: return max(scores, key=scores.get) return 'other' def extract_callsigns(self, text): """Витягує позивні""" # Патерн: Слово-Число або Слово з великої patterns = [ r'\b([А-ЯІЇЄҐ][а-яіїєґ]+)-?(\d+)\b', # Сокіл-1 r'\b([A-Z][a-z]+)-?(\d+)\b', # Alpha-2 ] callsigns = [] for pattern in patterns: matches = re.findall(pattern, text) for match in matches: callsign = f"{match[0]}-{match[1]}" if match[1] else match[0] callsigns.append(callsign) return list(set(callsigns)) def extract_coordinates(self, text): """Витягує координати""" # Десяткові decimal = re.findall(r'(\d+\.\d+)[,\s]\s*(\d+\.\d+)', text) coords = [] for lat, lon in decimal: lat_f, lon_f = float(lat), float(lon) if 44 < lat_f < 53 and 22 < lon_f < 41: # Україна coords.append((lat_f, lon_f)) return coords def extract_time(self, text): """Витягує час""" patterns = [ r'\[(\d{2}:\d{2})\]', # [08:23] r'о\s+(\d{2}:\d{2})', # о 08:23 r'(\d{2}:\d{2}:\d{2})', # 08:23:00 ] for pattern in patterns: match = re.search(pattern, text) if match: return match.group(1) return None def get_word_freq(self, texts): """Частота слів у списку текстів""" all_words = [] for text in texts: # Очищення clean = re.sub(r'[^\w\s]', '', text.lower()) words = [w for w in clean.split() if w not in self.stop_words and len(w) > 2] all_words.extend(words) return Counter(all_words).most_common(20) def analyze_message(self, text, timestamp=None): """Повний аналіз повідомлення""" return { 'category': self.categorize(text), 'callsigns': self.extract_callsigns(text), 'coordinates': self.extract_coordinates(text), 'time': self.extract_time(text) or timestamp, 'raw_text': text }

Файл 4: analyzers/geo_analyzer.py

"""Геопросторовий аналіз""" from geopy.distance import geodesic import folium from config import OUTPUT_DIR import logging logger = logging.getLogger(__name__) class GeoAnalyzer: def __init__(self): self.cluster_distance = 2 # км def calculate_distance(self, coord1, coord2): """Відстань між точками в км""" return geodesic(coord1, coord2).kilometers def find_nearby(self, center, points, radius_km=5): """Точки в радіусі""" nearby = [] for p in points: coord = (p['lat'], p['lon']) dist = self.calculate_distance(center, coord) if dist <= radius_km: p['distance'] = round(dist, 2) nearby.append(p) return sorted(nearby, key=lambda x: x['distance']) def cluster_points(self, points): """Проста кластеризація""" if not points: return [] clusters = [] used = set() for i, p1 in enumerate(points): if i in used: continue cluster = [p1] used.add(i) for j, p2 in enumerate(points): if j in used: continue dist = self.calculate_distance( (p1['lat'], p1['lon']), (p2['lat'], p2['lon']) ) if dist <= self.cluster_distance: cluster.append(p2) used.add(j) clusters.append(cluster) return clusters def generate_map(self, events_df, filename='map.html'): """Генерує інтерактивну карту""" # Фільтруємо точки з координатами geo_df = events_df.dropna(subset=['lat', 'lon']) if len(geo_df) == 0: logger.warning("Немає даних з координатами") return None # Центр карти center = [geo_df['lat'].mean(), geo_df['lon'].mean()] m = folium.Map(location=center, zoom_start=10) # Кольори категорій colors = { 'movement': 'blue', 'fire': 'red', 'contact': 'orange', 'comms': 'green', 'logistics': 'purple', 'other': 'gray' } for _, row in geo_df.iterrows(): color = colors.get(row.get('category', 'other'), 'gray') popup_text = f""" <b>{row.get('timestamp', 'N/A')}</b><br> Тип: {row.get('category', 'N/A')}<br> Сектор: {row.get('sector', 'N/A')}<br> """ folium.CircleMarker( location=[row['lat'], row['lon']], radius=8, color=color, fill=True, popup=popup_text ).add_to(m) output_path = OUTPUT_DIR / filename m.save(str(output_path)) logger.info(f"Карту збережено: {output_path}") return output_path

Файл 5: reports/html_report.py

"""Генерація HTML-звітів""" from datetime import datetime from config import OUTPUT_DIR import logging logger = logging.getLogger(__name__) class ReportGenerator: def generate(self, stats, word_freq, events_df): """Генерує повний HTML-звіт""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M') # Таблиця категорій cat_rows = ''.join( f"<tr><td>{cat}</td><td>{cnt}</td></tr>" for cat, cnt in stats.get('by_category', {}).items() ) # Таблиця секторів sector_rows = ''.join( f"<tr><td>{sec}</td><td>{cnt}</td></tr>" for sec, cnt in stats.get('by_sector', {}).items() ) # Часті слова word_rows = ''.join( f"<tr><td>{word}</td><td>{cnt}</td></tr>" for word, cnt in word_freq[:10] ) html = f"""<!DOCTYPE html> <html lang="uk"> <head> <meta charset="UTF-8"> <title>OSINT Звіт — {timestamp}</title> <style> * {{ box-sizing: border-box; }} body {{ font-family: -apple-system, Arial, sans-serif; margin: 0; padding: 20px; background: #f7fafc; }} .container {{ max-width: 1200px; margin: 0 auto; }} h1 {{ color: #2c5282; border-bottom: 3px solid #4299e1; padding-bottom: 10px; }} h2 {{ color: #2d3748; margin-top: 30px; }} .meta {{ color: #718096; margin-bottom: 30px; }} .grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }} .card {{ background: white; border-radius: 8px; padding: 20px; box-shadow: 0 1px 3px rgba(0,0,0,0.1); }} .card h3 {{ margin-top: 0; color: #4a5568; }} .stat-big {{ font-size: 48px; font-weight: bold; color: #2c5282; }} table {{ width: 100%; border-collapse: collapse; margin-top: 10px; }} th, td {{ padding: 10px; text-align: left; border-bottom: 1px solid #e2e8f0; }} th {{ background: #edf2f7; font-weight: 600; }} tr:hover {{ background: #f7fafc; }} .footer {{ margin-top: 40px; padding-top: 20px; border-top: 1px solid #e2e8f0; color: #718096; text-align: center; }} </style> </head> <body> <div class="container"> <h1>Аналітичний звіт OSINT</h1> <p class="meta">Згенеровано: {timestamp}</p> <div class="grid"> <div class="card"> <h3>Усього подій</h3> <div class="stat-big">{stats.get('total_events', 0)}</div> </div> <div class="card"> <h3>Унікальних сутностей</h3> <div class="stat-big">{stats.get('total_entities', 0)}</div> </div> </div> <h2>Розподіл по категоріях</h2> <div class="card"> <table> <tr><th>Категорія</th><th>Кількість</th></tr> {cat_rows} </table> </div> <h2>Розподіл по секторах</h2> <div class="card"> <table> <tr><th>Сектор</th><th>Кількість</th></tr> {sector_rows} </table> </div> <h2>Найчастіші слова</h2> <div class="card"> <table> <tr><th>Слово</th><th>Кількість</th></tr> {word_rows} </table> </div> <p class="footer"> Звіт згенеровано автоматично системою OSINT Analysis<br> <a href="map.html">Переглянути карту</a> </p> </div> </body> </html>""" output_path = OUTPUT_DIR / f"report_{datetime.now():%Y%m%d_%H%M}.html" with open(output_path, 'w', encoding='utf-8') as f: f.write(html) logger.info(f"Звіт збережено: {output_path}") return output_path

Файл 6: main.py

"""Головний скрипт системи""" import logging from datetime import datetime from pathlib import Path from config import LOG_DIR, LOG_FORMAT, LOG_LEVEL, DATA_DIR from database import Database from analyzers.text_analyzer import TextAnalyzer from analyzers.geo_analyzer import GeoAnalyzer from reports.html_report import ReportGenerator # Налаштування логування logging.basicConfig( level=LOG_LEVEL, format=LOG_FORMAT, handlers=[ logging.FileHandler(LOG_DIR / f"osint_{datetime.now():%Y%m%d}.log", encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger('main') def process_messages(messages, db, text_analyzer): """Обробляє список повідомлень""" processed = 0 for msg in messages: # Аналіз тексту analysis = text_analyzer.analyze_message(msg['text'], msg.get('timestamp')) # Координати (якщо є) lat, lon = None, None if analysis['coordinates']: lat, lon = analysis['coordinates'][0] # Зберігаємо подію db.add_event( timestamp=analysis['time'], event_type=msg.get('type', 'message'), category=analysis['category'], sector=msg.get('sector'), lat=lat, lon=lon, source=msg.get('source', 'unknown'), raw_text=analysis['raw_text'], callsigns=','.join(analysis['callsigns']) ) # Зберігаємо сутності (позивні) for callsign in analysis['callsigns']: db.add_entity(callsign, 'callsign', analysis['time']) processed += 1 return processed def load_test_data(): """Завантажує тестові дані""" return [ {'timestamp': '2025-01-15 08:23', 'text': '[08:23] Сокіл-1: Група на позиції 48.4567, 35.0234, зв\'язок встановлено', 'sector': 'A'}, {'timestamp': '2025-01-15 08:45', 'text': '[08:45] Орел-2: Переміщення до точки Альфа, координати 48.5123, 35.1456', 'sector': 'B'}, {'timestamp': '2025-01-15 09:12', 'text': '[09:12] Сокіл-1: Вогневий контакт на позиції, потрібна підтримка', 'sector': 'A'}, {'timestamp': '2025-01-15 09:30', 'text': '[09:30] Грім-3: Виявлено рух противника, координати 49.1234, 36.5678', 'sector': 'C'}, {'timestamp': '2025-01-15 10:15', 'text': '[10:15] Орел-2: Прибув на точку, зв\'язок стабільний', 'sector': 'B'}, {'timestamp': '2025-01-15 10:45', 'text': '[10:45] Сокіл-1: Контакт завершено, запит на евакуацію', 'sector': 'A'}, {'timestamp': '2025-01-15 11:00', 'text': '[11:00] Беркут-1: Спостереження з позиції 48.4800, 35.0500', 'sector': 'A'}, {'timestamp': '2025-01-15 11:30', 'text': '[11:30] Грім-3: Артилерія по квадрату, очікуємо результат', 'sector': 'C'}, ] def main(): logger.info("=" * 60) logger.info("ЗАПУСК OSINT СИСТЕМИ") logger.info("=" * 60) # Ініціалізація db = Database() text_analyzer = TextAnalyzer() geo_analyzer = GeoAnalyzer() report_gen = ReportGenerator() try: # 1. Завантаження даних logger.info("Крок 1: Завантаження даних") messages = load_test_data() logger.info(f" Завантажено: {len(messages)} повідомлень") # 2. Обробка logger.info("Крок 2: Обробка повідомлень") processed = process_messages(messages, db, text_analyzer) logger.info(f" Оброблено: {processed}") # 3. Аналіз logger.info("Крок 3: Аналіз") stats = db.get_stats() logger.info(f" Подій у базі: {stats['total_events']}") logger.info(f" Сутностей: {stats['total_entities']}") # Частота слів events_df = db.query('SELECT * FROM events') word_freq = text_analyzer.get_word_freq(events_df['raw_text'].tolist()) # 4. Карта logger.info("Крок 4: Генерація карти") geo_analyzer.generate_map(events_df) # 5. Звіт logger.info("Крок 5: Генерація звіту") report_path = report_gen.generate(stats, word_freq, events_df) logger.info("=" * 60) logger.info("ЗАВЕРШЕНО УСПІШНО") logger.info(f"Звіт: {report_path}") logger.info("=" * 60) except Exception as e: logger.error(f"ПОМИЛКА: {e}", exc_info=True) raise finally: db.close() if __name__ == '__main__': main()

Запуск системи

# Встановлення залежностей pip install pandas geopy folium # Створення структури mkdir -p osint_system/{collectors,analyzers,reports,data,output,logs} touch osint_system/collectors/__init__.py touch osint_system/analyzers/__init__.py touch osint_system/reports/__init__.py # Запуск cd osint_system python main.py

Очікуваний результат

2025-01-15 12:00:00 | INFO | main | ============================================================ 2025-01-15 12:00:00 | INFO | main | ЗАПУСК OSINT СИСТЕМИ 2025-01-15 12:00:00 | INFO | main | ============================================================ 2025-01-15 12:00:00 | INFO | database | Підключено до osint.db 2025-01-15 12:00:00 | INFO | main | Крок 1: Завантаження даних 2025-01-15 12:00:00 | INFO | main | Завантажено: 8 повідомлень 2025-01-15 12:00:00 | INFO | main | Крок 2: Обробка повідомлень 2025-01-15 12:00:00 | INFO | main | Оброблено: 8 2025-01-15 12:00:00 | INFO | main | Крок 3: Аналіз 2025-01-15 12:00:00 | INFO | main | Подій у базі: 8 2025-01-15 12:00:00 | INFO | main | Сутностей: 4 2025-01-15 12:00:01 | INFO | geo_analyzer | Карту збережено: output/map.html 2025-01-15 12:00:01 | INFO | html_report | Звіт збережено: output/report_20250115_1200.html 2025-01-15 12:00:01 | INFO | main | ============================================================ 2025-01-15 12:00:01 | INFO | main | ЗАВЕРШЕНО УСПІШНО 2025-01-15 12:00:01 | INFO | main | ============================================================

Розширення системи

Система модульна — легко додавати нові можливості:

Підсумок курсу

За 10 уроків ви освоїли:

✓ Вітаємо! Ви маєте всі необхідні навички для створення власних інструментів аналізу. Головне — практика. Адаптуйте систему під свої задачі, експериментуйте, питайте AI.

Фінальний чек-лист

☐ Структура проєкту створена ☐ Усі модулі на місці ☐ main.py запускається ☐ База даних наповнюється ☐ Карта генерується ☐ HTML-звіт створюється ☐ Логи записуються

Що далі?

Слава Україні! 🇺🇦