Files
Phantom/release/test-billion-domains.py

680 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Phantom TLD System - Billion Domains Scalability Test
Тестирование масштабируемости с миллиардами доменов
Автор: Phantom Protocol Team 2025
Версия: 1.0.0
"""
import asyncio
import aiohttp
import time
import json
import random
import string
import hashlib
import statistics
import concurrent.futures
import threading
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Generator
import argparse
import logging
import sys
import sqlite3
import psutil
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('billion-domains-test.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
@dataclass
class ScalabilityConfig:
"""Конфигурация тестирования масштабируемости"""
tld_nodes: List[str]
target_domains: int = 1_000_000_000 # 1 миллиард доменов
batch_size: int = 10_000 # Размер пакета для регистрации
concurrent_workers: int = 100
shard_count: int = 256
test_phases: List[str] = None
def __post_init__(self):
if self.test_phases is None:
self.test_phases = [
"domain_generation",
"shard_distribution",
"batch_registration",
"query_performance",
"memory_usage",
"consensus_load"
]
class DomainShardCalculator:
"""Калькулятор шардинга доменов"""
def __init__(self, shard_count: int = 256):
self.shard_count = shard_count
def calculate_shard(self, domain: str) -> int:
"""Вычисление номера шарда для домена"""
domain_hash = hashlib.sha256(domain.encode()).hexdigest()
return int(domain_hash[:8], 16) % self.shard_count
def get_shard_distribution(self, domains: List[str]) -> Dict[int, int]:
"""Получение распределения доменов по шардам"""
distribution = defaultdict(int)
for domain in domains:
shard = self.calculate_shard(domain)
distribution[shard] += 1
return dict(distribution)
class BillionDomainGenerator:
"""Генератор миллиардов доменов"""
def __init__(self, target_count: int = 1_000_000_000):
self.target_count = target_count
self.generated_count = 0
def generate_domain_batch(self, batch_size: int, tld: str = "phantom") -> List[str]:
"""Генерация пакета доменов"""
domains = []
for i in range(batch_size):
if self.generated_count >= self.target_count:
break
# Генерация уникального домена
domain_id = self.generated_count
# Различные паттерны доменов для реалистичности
if domain_id % 10 == 0:
# Корпоративные домены
domain = f"company-{domain_id:010d}.{tld}"
elif domain_id % 10 == 1:
# Персональные домены
domain = f"user-{domain_id:010d}.{tld}"
elif domain_id % 10 == 2:
# Сервисные домены
domain = f"service-{domain_id:010d}.{tld}"
elif domain_id % 10 == 3:
# API домены
domain = f"api-{domain_id:010d}.{tld}"
elif domain_id % 10 == 4:
# CDN домены
domain = f"cdn-{domain_id:010d}.{tld}"
else:
# Случайные домены
random_part = ''.join(random.choices(string.ascii_lowercase, k=8))
domain = f"{random_part}-{domain_id:010d}.{tld}"
domains.append(domain)
self.generated_count += 1
return domains
def generate_streaming(self, batch_size: int = 10000, tld: str = "phantom") -> Generator[List[str], None, None]:
"""Потоковая генерация доменов"""
while self.generated_count < self.target_count:
batch = self.generate_domain_batch(batch_size, tld)
if not batch:
break
yield batch
class ScalabilityDatabase:
"""База данных для отслеживания тестирования масштабируемости"""
def __init__(self, db_path: str = "scalability_test.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Инициализация базы данных"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Таблица для доменов
cursor.execute("""
CREATE TABLE IF NOT EXISTS domains (
id INTEGER PRIMARY KEY AUTOINCREMENT,
domain TEXT UNIQUE NOT NULL,
shard_id INTEGER NOT NULL,
registered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
registration_time REAL,
status TEXT DEFAULT 'pending'
)
""")
# Таблица для метрик производительности
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metric_name TEXT NOT NULL,
metric_value REAL NOT NULL,
additional_data TEXT
)
""")
# Таблица для результатов тестов
cursor.execute("""
CREATE TABLE IF NOT EXISTS test_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_name TEXT NOT NULL,
start_time TIMESTAMP,
end_time TIMESTAMP,
domains_processed INTEGER,
success_rate REAL,
avg_response_time REAL,
throughput REAL,
additional_metrics TEXT
)
""")
conn.commit()
conn.close()
def record_domain_batch(self, domains: List[str], shard_calculator: DomainShardCalculator):
"""Запись пакета доменов в базу данных"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
domain_data = []
for domain in domains:
shard_id = shard_calculator.calculate_shard(domain)
domain_data.append((domain, shard_id))
cursor.executemany(
"INSERT OR IGNORE INTO domains (domain, shard_id) VALUES (?, ?)",
domain_data
)
conn.commit()
conn.close()
def get_domain_count(self) -> int:
"""Получение общего количества доменов"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM domains")
count = cursor.fetchone()[0]
conn.close()
return count
def get_shard_statistics(self) -> Dict[str, Any]:
"""Получение статистики по шардам"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT shard_id, COUNT(*) as domain_count
FROM domains
GROUP BY shard_id
ORDER BY shard_id
""")
shard_data = cursor.fetchall()
conn.close()
if not shard_data:
return {}
shard_counts = [count for _, count in shard_data]
return {
'total_shards': len(shard_data),
'min_domains_per_shard': min(shard_counts),
'max_domains_per_shard': max(shard_counts),
'avg_domains_per_shard': statistics.mean(shard_counts),
'std_domains_per_shard': statistics.stdev(shard_counts) if len(shard_counts) > 1 else 0,
'shard_distribution': dict(shard_data)
}
class BillionDomainTester:
"""Основной класс для тестирования миллиардов доменов"""
def __init__(self, config: ScalabilityConfig):
self.config = config
self.generator = BillionDomainGenerator(config.target_domains)
self.shard_calculator = DomainShardCalculator(config.shard_count)
self.database = ScalabilityDatabase()
self.session = None
async def __aenter__(self):
"""Асинхронный контекст менеджер - вход"""
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=60),
connector=aiohttp.TCPConnector(limit=1000)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Асинхронный контекст менеджер - выход"""
if self.session:
await self.session.close()
async def test_domain_generation_performance(self) -> Dict[str, Any]:
"""Тест производительности генерации доменов"""
logger.info("Тестирование производительности генерации доменов...")
start_time = time.time()
total_generated = 0
generation_times = []
# Генерация 1 миллиона доменов для теста
test_target = 1_000_000
for batch in self.generator.generate_streaming(self.config.batch_size):
batch_start = time.time()
# Запись в базу данных
self.database.record_domain_batch(batch, self.shard_calculator)
batch_time = time.time() - batch_start
generation_times.append(batch_time)
total_generated += len(batch)
if total_generated >= test_target:
break
if total_generated % 100_000 == 0:
logger.info(f"Сгенерировано {total_generated:,} доменов")
end_time = time.time()
total_time = end_time - start_time
return {
'total_domains_generated': total_generated,
'total_time': total_time,
'domains_per_second': total_generated / total_time,
'avg_batch_time': statistics.mean(generation_times),
'min_batch_time': min(generation_times),
'max_batch_time': max(generation_times)
}
async def test_shard_distribution(self) -> Dict[str, Any]:
"""Тест распределения доменов по шардам"""
logger.info("Тестирование распределения по шардам...")
# Получение статистики из базы данных
shard_stats = self.database.get_shard_statistics()
# Анализ равномерности распределения
if shard_stats and 'shard_distribution' in shard_stats:
distribution = list(shard_stats['shard_distribution'].values())
# Коэффициент вариации (должен быть близок к 0 для равномерного распределения)
cv = shard_stats['std_domains_per_shard'] / shard_stats['avg_domains_per_shard'] if shard_stats['avg_domains_per_shard'] > 0 else 0
# Оценка качества распределения
if cv < 0.1:
distribution_quality = "Отличное"
elif cv < 0.2:
distribution_quality = "Хорошее"
elif cv < 0.3:
distribution_quality = "Удовлетворительное"
else:
distribution_quality = "Плохое"
shard_stats['coefficient_of_variation'] = cv
shard_stats['distribution_quality'] = distribution_quality
return shard_stats
async def test_batch_registration_performance(self) -> Dict[str, Any]:
"""Тест производительности пакетной регистрации"""
logger.info("Тестирование производительности пакетной регистрации...")
start_time = time.time()
total_registered = 0
registration_times = []
success_count = 0
# Тестирование с 100,000 доменов
test_batches = 10
async def register_batch(domains: List[str]) -> tuple[int, float]:
"""Регистрация пакета доменов"""
batch_start = time.time()
successful = 0
# Параллельная регистрация доменов в пакете
semaphore = asyncio.Semaphore(50) # Ограничение concurrency
async def register_single_domain(domain: str) -> bool:
async with semaphore:
try:
node = random.choice(self.config.tld_nodes)
url = f"http://{node}/api/domains"
domain_parts = domain.split('.')
payload = {
"domain": domain_parts[0],
"tld": domain_parts[1],
"ipv4": f"192.168.{random.randint(1, 254)}.{random.randint(1, 254)}",
"ttl": 3600
}
async with self.session.post(url, json=payload) as response:
return response.status in [200, 201]
except Exception as e:
logger.debug(f"Ошибка регистрации {domain}: {e}")
return False
# Выполнение регистрации
tasks = [register_single_domain(domain) for domain in domains]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = sum(1 for result in results if result is True)
batch_time = time.time() - batch_start
return successful, batch_time
# Выполнение тестовых пакетов
for i in range(test_batches):
batch = self.generator.generate_domain_batch(self.config.batch_size)
if not batch:
break
successful, batch_time = await register_batch(batch)
registration_times.append(batch_time)
success_count += successful
total_registered += len(batch)
logger.info(f"Пакет {i+1}/{test_batches}: {successful}/{len(batch)} успешно, {batch_time:.2f}с")
end_time = time.time()
total_time = end_time - start_time
return {
'total_domains_attempted': total_registered,
'total_domains_successful': success_count,
'success_rate': success_count / total_registered if total_registered > 0 else 0,
'total_time': total_time,
'domains_per_second': success_count / total_time if total_time > 0 else 0,
'avg_batch_time': statistics.mean(registration_times) if registration_times else 0,
'min_batch_time': min(registration_times) if registration_times else 0,
'max_batch_time': max(registration_times) if registration_times else 0
}
async def test_query_performance_at_scale(self) -> Dict[str, Any]:
"""Тест производительности запросов при большом масштабе"""
logger.info("Тестирование производительности запросов при большом масштабе...")
# Получение случайных доменов из базы данных для тестирования
conn = sqlite3.connect(self.database.db_path)
cursor = conn.cursor()
cursor.execute("SELECT domain FROM domains WHERE status = 'pending' ORDER BY RANDOM() LIMIT 1000")
test_domains = [row[0] for row in cursor.fetchall()]
conn.close()
if not test_domains:
logger.warning("Нет доменов для тестирования запросов")
return {}
start_time = time.time()
query_times = []
successful_queries = 0
async def query_domain(domain: str) -> tuple[bool, float]:
"""Выполнение DNS запроса"""
query_start = time.time()
try:
# Имитация DNS запроса через HTTP API
node = random.choice(self.config.tld_nodes)
url = f"http://{node}/api/domains/{domain}"
async with self.session.get(url) as response:
success = response.status == 200
query_time = time.time() - query_start
return success, query_time
except Exception as e:
logger.debug(f"Ошибка запроса {domain}: {e}")
query_time = time.time() - query_start
return False, query_time
# Параллельное выполнение запросов
semaphore = asyncio.Semaphore(100)
async def limited_query(domain: str):
async with semaphore:
return await query_domain(domain)
# Выполнение всех запросов
tasks = [limited_query(domain) for domain in test_domains]
results = await asyncio.gather(*tasks)
# Анализ результатов
for success, query_time in results:
query_times.append(query_time)
if success:
successful_queries += 1
end_time = time.time()
total_time = end_time - start_time
return {
'total_queries': len(test_domains),
'successful_queries': successful_queries,
'success_rate': successful_queries / len(test_domains) if test_domains else 0,
'total_time': total_time,
'queries_per_second': len(test_domains) / total_time if total_time > 0 else 0,
'avg_query_time': statistics.mean(query_times) if query_times else 0,
'p95_query_time': np.percentile(query_times, 95) if query_times else 0,
'p99_query_time': np.percentile(query_times, 99) if query_times else 0
}
def test_memory_usage_projection(self) -> Dict[str, Any]:
"""Тест проекции использования памяти"""
logger.info("Анализ проекции использования памяти...")
# Текущее использование памяти
current_memory = psutil.virtual_memory()
process = psutil.Process()
process_memory = process.memory_info()
# Оценка памяти на домен (примерно)
current_domains = self.database.get_domain_count()
if current_domains > 0:
memory_per_domain = process_memory.rss / current_domains
else:
memory_per_domain = 1024 # 1KB на домен (оценка)
# Проекция для миллиарда доменов
projected_memory_gb = (memory_per_domain * self.config.target_domains) / (1024**3)
# Оценка требований к кэшу
cache_hit_ratio = 0.8 # Предполагаемый коэффициент попаданий в кэш
active_domains_ratio = 0.1 # 10% доменов активно используются
active_domains = self.config.target_domains * active_domains_ratio
cache_memory_gb = (active_domains * memory_per_domain * cache_hit_ratio) / (1024**3)
return {
'current_memory_usage_mb': process_memory.rss / (1024**2),
'current_domains': current_domains,
'memory_per_domain_bytes': memory_per_domain,
'projected_total_memory_gb': projected_memory_gb,
'projected_cache_memory_gb': cache_memory_gb,
'recommended_ram_gb': max(64, projected_memory_gb * 1.5), # С запасом
'sharding_benefit': projected_memory_gb / self.config.shard_count
}
def generate_scalability_report(self, test_results: Dict[str, Any]) -> str:
"""Генерация отчета о масштабируемости"""
report = []
report.append("# Phantom TLD System - Отчет о тестировании масштабируемости")
report.append(f"Цель: {self.config.target_domains:,} доменов")
report.append(f"Дата: {time.strftime('%Y-%m-%d %H:%M:%S')}")
report.append("")
# Результаты по фазам
for phase, results in test_results.items():
if not results:
continue
report.append(f"## {phase.replace('_', ' ').title()}")
if phase == "domain_generation":
report.append(f"- Сгенерировано доменов: {results.get('total_domains_generated', 0):,}")
report.append(f"- Скорость генерации: {results.get('domains_per_second', 0):,.0f} доменов/сек")
report.append(f"- Время генерации пакета: {results.get('avg_batch_time', 0):.3f}с")
elif phase == "shard_distribution":
report.append(f"- Всего шардов: {results.get('total_shards', 0)}")
report.append(f"- Среднее доменов на шард: {results.get('avg_domains_per_shard', 0):,.0f}")
report.append(f"- Коэффициент вариации: {results.get('coefficient_of_variation', 0):.3f}")
report.append(f"- Качество распределения: {results.get('distribution_quality', 'Неизвестно')}")
elif phase == "batch_registration":
report.append(f"- Успешность регистрации: {results.get('success_rate', 0):.1%}")
report.append(f"- Скорость регистрации: {results.get('domains_per_second', 0):,.0f} доменов/сек")
report.append(f"- Время обработки пакета: {results.get('avg_batch_time', 0):.2f}с")
elif phase == "query_performance":
report.append(f"- Успешность запросов: {results.get('success_rate', 0):.1%}")
report.append(f"- Скорость запросов: {results.get('queries_per_second', 0):,.0f} запросов/сек")
report.append(f"- P95 время отклика: {results.get('p95_query_time', 0)*1000:.1f}мс")
elif phase == "memory_usage":
report.append(f"- Прогнозируемая память: {results.get('projected_total_memory_gb', 0):.1f} ГБ")
report.append(f"- Рекомендуемая RAM: {results.get('recommended_ram_gb', 0):.0f} ГБ")
report.append(f"- Память на домен: {results.get('memory_per_domain_bytes', 0):.0f} байт")
report.append("")
# Общие выводы
report.append("## Выводы о масштабируемости")
# Анализ возможности обработки миллиарда доменов
domain_gen = test_results.get('domain_generation', {})
if domain_gen.get('domains_per_second', 0) > 10000:
report.append("✅ Генерация доменов: Отличная производительность")
else:
report.append("⚠️ Генерация доменов: Требуется оптимизация")
shard_dist = test_results.get('shard_distribution', {})
if shard_dist.get('coefficient_of_variation', 1) < 0.1:
report.append("✅ Шардинг: Равномерное распределение")
else:
report.append("⚠️ Шардинг: Неравномерное распределение")
batch_reg = test_results.get('batch_registration', {})
if batch_reg.get('success_rate', 0) > 0.95:
report.append("✅ Регистрация: Высокая надежность")
else:
report.append("⚠️ Регистрация: Требуется улучшение надежности")
memory_usage = test_results.get('memory_usage', {})
if memory_usage.get('recommended_ram_gb', 0) < 1000:
report.append("✅ Память: Разумные требования к ресурсам")
else:
report.append("⚠️ Память: Высокие требования к ресурсам")
# Итоговая оценка
report.append("")
report.append("## Итоговая оценка масштабируемости")
# Расчет времени для регистрации миллиарда доменов
if batch_reg.get('domains_per_second', 0) > 0:
time_for_billion = self.config.target_domains / batch_reg['domains_per_second']
days = time_for_billion / (24 * 3600)
report.append(f"Время регистрации {self.config.target_domains:,} доменов: {days:.1f} дней")
if days < 30:
report.append("🎯 **ГОТОВНОСТЬ К МИЛЛИАРДУ ДОМЕНОВ: ВЫСОКАЯ**")
elif days < 90:
report.append("🎯 **ГОТОВНОСТЬ К МИЛЛИАРДУ ДОМЕНОВ: СРЕДНЯЯ**")
else:
report.append("🎯 **ГОТОВНОСТЬ К МИЛЛИАРДУ ДОМЕНОВ: ТРЕБУЕТСЯ ОПТИМИЗАЦИЯ**")
return "\n".join(report)
async def main():
"""Главная функция"""
parser = argparse.ArgumentParser(description="Phantom TLD System Billion Domains Scalability Test")
parser.add_argument("--tld-nodes", nargs="+", default=["localhost:8053"],
help="TLD узлы для тестирования")
parser.add_argument("--target-domains", type=int, default=1_000_000,
help="Целевое количество доменов для тестирования")
parser.add_argument("--batch-size", type=int, default=1000,
help="Размер пакета для операций")
parser.add_argument("--concurrent-workers", type=int, default=50,
help="Количество параллельных воркеров")
parser.add_argument("--output", default="billion-domains-report.md",
help="Файл для сохранения отчета")
args = parser.parse_args()
# Создание конфигурации
config = ScalabilityConfig(
tld_nodes=args.tld_nodes,
target_domains=args.target_domains,
batch_size=args.batch_size,
concurrent_workers=args.concurrent_workers
)
logger.info(f"Запуск тестирования масштабируемости до {config.target_domains:,} доменов")
# Выполнение тестов
async with BillionDomainTester(config) as tester:
test_results = {}
# Выполнение всех фаз тестирования
for phase in config.test_phases:
logger.info(f"Выполнение фазы: {phase}")
try:
if phase == "domain_generation":
result = await tester.test_domain_generation_performance()
elif phase == "shard_distribution":
result = await tester.test_shard_distribution()
elif phase == "batch_registration":
result = await tester.test_batch_registration_performance()
elif phase == "query_performance":
result = await tester.test_query_performance_at_scale()
elif phase == "memory_usage":
result = tester.test_memory_usage_projection()
else:
logger.warning(f"Неизвестная фаза тестирования: {phase}")
continue
test_results[phase] = result
logger.info(f"Фаза {phase} завершена успешно")
except Exception as e:
logger.error(f"Ошибка в фазе {phase}: {e}")
test_results[phase] = {}
# Генерация отчета
report = tester.generate_scalability_report(test_results)
# Сохранение отчета
with open(args.output, 'w', encoding='utf-8') as f:
f.write(report)
logger.info(f"Отчет сохранен в {args.output}")
print(f"\nТестирование масштабируемости завершено. Отчет сохранен в {args.output}")
if __name__ == "__main__":
asyncio.run(main())