Files
Phantom/release/examples/vpn-client.py

505 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Phantom VPN Client
Создает VPN подключение через Phantom сеть с использованием TUN интерфейса
Использование:
sudo python3 vpn-client.py --server phantom-vpn.local --interface tun0
Автор: Phantom Protocol Team 2025
"""
import os
import sys
import socket
import struct
import select
import threading
import argparse
import logging
import time
import subprocess
import fcntl
from typing import Optional, Tuple
# Проверка прав root
if os.geteuid() != 0:
print("Ошибка: VPN клиент требует права root для создания TUN интерфейса")
print("Запустите: sudo python3 vpn-client.py")
sys.exit(1)
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Константы для TUN интерфейса
TUNSETIFF = 0x400454ca
IFF_TUN = 0x0001
IFF_NO_PI = 0x1000
class PhantomVPNClient:
"""VPN клиент через Phantom сеть"""
def __init__(self, server_host: str, server_port: int = 1194,
interface: str = 'tun0', hops: int = 3):
self.server_host = server_host
self.server_port = server_port
self.interface = interface
self.hops = hops
self.tun_fd: Optional[int] = None
self.phantom_socket: Optional[socket.socket] = None
self.running = False
# Сетевые настройки
self.local_ip = "10.8.0.2"
self.remote_ip = "10.8.0.1"
self.netmask = "255.255.255.0"
self.mtu = 1500
def create_tun_interface(self) -> bool:
"""Создание TUN интерфейса"""
try:
logger.info(f"Создание TUN интерфейса {self.interface}")
# Открытие /dev/net/tun
self.tun_fd = os.open('/dev/net/tun', os.O_RDWR)
# Настройка интерфейса
ifr = struct.pack('16sH', self.interface.encode('utf-8'), IFF_TUN | IFF_NO_PI)
fcntl.ioctl(self.tun_fd, TUNSETIFF, ifr)
logger.info(f"TUN интерфейс {self.interface} создан")
return True
except Exception as e:
logger.error(f"Ошибка создания TUN интерфейса: {e}")
return False
def configure_interface(self) -> bool:
"""Настройка сетевого интерфейса"""
try:
logger.info(f"Настройка интерфейса {self.interface}")
# Назначение IP адреса
cmd = f"ip addr add {self.local_ip}/24 dev {self.interface}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Ошибка назначения IP: {result.stderr}")
return False
# Поднятие интерфейса
cmd = f"ip link set {self.interface} up"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Ошибка поднятия интерфейса: {result.stderr}")
return False
# Настройка MTU
cmd = f"ip link set {self.interface} mtu {self.mtu}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"Не удалось установить MTU: {result.stderr}")
logger.info(f"Интерфейс {self.interface} настроен: {self.local_ip}/24")
return True
except Exception as e:
logger.error(f"Ошибка настройки интерфейса: {e}")
return False
def setup_routing(self) -> bool:
"""Настройка маршрутизации"""
try:
logger.info("Настройка маршрутизации для VPN")
# Сохранение текущего шлюза по умолчанию
result = subprocess.run("ip route show default", shell=True,
capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
self.original_gateway = result.stdout.strip()
logger.info(f"Сохранен оригинальный шлюз: {self.original_gateway}")
# Добавление маршрута для VPN сервера через оригинальный шлюз
# (чтобы избежать рекурсии)
if hasattr(self, 'original_gateway'):
gateway_ip = self.original_gateway.split()[2]
cmd = f"ip route add {self.server_host}/32 via {gateway_ip}"
subprocess.run(cmd, shell=True)
# Настройка маршрута по умолчанию через VPN
cmd = f"ip route add 0.0.0.0/1 dev {self.interface}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"Не удалось добавить маршрут 0.0.0.0/1: {result.stderr}")
cmd = f"ip route add 128.0.0.0/1 dev {self.interface}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"Не удалось добавить маршрут 128.0.0.0/1: {result.stderr}")
logger.info("Маршрутизация настроена")
return True
except Exception as e:
logger.error(f"Ошибка настройки маршрутизации: {e}")
return False
def connect_to_phantom_server(self) -> bool:
"""Подключение к VPN серверу через Phantom сеть"""
try:
logger.info(f"Подключение к VPN серверу {self.server_host}:{self.server_port}")
logger.info(f"Маршрут через {self.hops} хопов Phantom сети")
# Создание сокета
self.phantom_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.phantom_socket.settimeout(30)
# В реальной реализации здесь будет построение маршрута через Phantom
# Сейчас используем прямое подключение для демонстрации
try:
# Попытка подключения через Phantom (симуляция)
phantom_gateway = "127.0.0.1" # Локальный Phantom узел
phantom_port = 8050
logger.info(f"Подключение через Phantom gateway {phantom_gateway}:{phantom_port}")
self.phantom_socket.connect((phantom_gateway, phantom_port))
# Отправка команды VPN туннеля
vpn_command = f"PHANTOM_VPN_CONNECT {self.server_host} {self.server_port} {self.hops}\n"
self.phantom_socket.send(vpn_command.encode('utf-8'))
# Ожидание подтверждения
response = self.phantom_socket.recv(1024).decode('utf-8')
if response.startswith("PHANTOM_VPN_OK"):
logger.info("VPN туннель через Phantom сеть установлен")
return True
else:
logger.error(f"Ошибка установки VPN туннеля: {response}")
return False
except socket.error:
# Fallback: прямое подключение для демонстрации
logger.info(f"Fallback: прямое подключение к {self.server_host}:{self.server_port}")
self.phantom_socket.close()
self.phantom_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.phantom_socket.settimeout(30)
self.phantom_socket.connect((self.server_host, self.server_port))
# Простой VPN handshake
handshake = b"PHANTOM_VPN_CLIENT_HELLO\n"
self.phantom_socket.send(handshake)
response = self.phantom_socket.recv(1024)
if b"VPN_SERVER_HELLO" in response:
logger.info("VPN handshake завершен")
return True
else:
logger.error("Ошибка VPN handshake")
return False
except Exception as e:
logger.error(f"Ошибка подключения к VPN серверу: {e}")
return False
def start_packet_forwarding(self):
"""Запуск пересылки пакетов между TUN и Phantom сокетом"""
logger.info("Запуск пересылки пакетов")
# Поток для пересылки пакетов от TUN к сокету
tun_to_socket_thread = threading.Thread(target=self._forward_tun_to_socket)
tun_to_socket_thread.daemon = True
tun_to_socket_thread.start()
# Поток для пересылки пакетов от сокета к TUN
socket_to_tun_thread = threading.Thread(target=self._forward_socket_to_tun)
socket_to_tun_thread.daemon = True
socket_to_tun_thread.start()
return tun_to_socket_thread, socket_to_tun_thread
def _forward_tun_to_socket(self):
"""Пересылка пакетов от TUN интерфейса к Phantom сокету"""
logger.debug("Поток TUN → Socket запущен")
while self.running:
try:
# Чтение пакета из TUN интерфейса
ready, _, _ = select.select([self.tun_fd], [], [], 1.0)
if not ready:
continue
packet = os.read(self.tun_fd, 2048)
if not packet:
continue
# Отправка пакета через Phantom сокет
# Добавляем заголовок с длиной пакета
packet_length = struct.pack('!H', len(packet))
self.phantom_socket.send(packet_length + packet)
logger.debug(f"TUN → Socket: {len(packet)} байт")
except Exception as e:
if self.running:
logger.error(f"Ошибка пересылки TUN → Socket: {e}")
break
logger.debug("Поток TUN → Socket завершен")
def _forward_socket_to_tun(self):
"""Пересылка пакетов от Phantom сокета к TUN интерфейсу"""
logger.debug("Поток Socket → TUN запущен")
while self.running:
try:
# Чтение заголовка с длиной пакета
ready, _, _ = select.select([self.phantom_socket], [], [], 1.0)
if not ready:
continue
length_data = self._recv_exact(self.phantom_socket, 2)
if not length_data:
continue
packet_length = struct.unpack('!H', length_data)[0]
# Чтение самого пакета
packet = self._recv_exact(self.phantom_socket, packet_length)
if not packet:
continue
# Запись пакета в TUN интерфейс
os.write(self.tun_fd, packet)
logger.debug(f"Socket → TUN: {len(packet)} байт")
except Exception as e:
if self.running:
logger.error(f"Ошибка пересылки Socket → TUN: {e}")
break
logger.debug("Поток Socket → TUN завершен")
def _recv_exact(self, sock: socket.socket, length: int) -> bytes:
"""Получение точного количества байт из сокета"""
data = b''
while len(data) < length:
chunk = sock.recv(length - len(data))
if not chunk:
return b''
data += chunk
return data
def start(self) -> bool:
"""Запуск VPN клиента"""
try:
logger.info("🚀 Запуск Phantom VPN Client")
# Создание TUN интерфейса
if not self.create_tun_interface():
return False
# Настройка интерфейса
if not self.configure_interface():
return False
# Подключение к VPN серверу
if not self.connect_to_phantom_server():
return False
# Настройка маршрутизации
if not self.setup_routing():
return False
self.running = True
# Запуск пересылки пакетов
threads = self.start_packet_forwarding()
logger.info("✅ Phantom VPN подключение установлено")
logger.info(f" Локальный IP: {self.local_ip}")
logger.info(f" VPN сервер: {self.server_host}:{self.server_port}")
logger.info(f" Интерфейс: {self.interface}")
logger.info(f" Хопов через Phantom: {self.hops}")
logger.info(" Весь трафик теперь проходит через Phantom VPN")
return True
except Exception as e:
logger.error(f"Ошибка запуска VPN клиента: {e}")
return False
def stop(self):
"""Остановка VPN клиента"""
logger.info("Остановка Phantom VPN Client")
self.running = False
# Восстановление маршрутизации
self._restore_routing()
# Закрытие сокета
if self.phantom_socket:
self.phantom_socket.close()
self.phantom_socket = None
# Удаление TUN интерфейса
if self.tun_fd:
os.close(self.tun_fd)
self.tun_fd = None
# Удаление интерфейса
subprocess.run(f"ip link delete {self.interface}", shell=True)
logger.info("Phantom VPN Client остановлен")
def _restore_routing(self):
"""Восстановление оригинальной маршрутизации"""
try:
logger.info("Восстановление оригинальной маршрутизации")
# Удаление VPN маршрутов
subprocess.run(f"ip route del 0.0.0.0/1 dev {self.interface}", shell=True)
subprocess.run(f"ip route del 128.0.0.0/1 dev {self.interface}", shell=True)
# Удаление маршрута к VPN серверу
if hasattr(self, 'original_gateway'):
subprocess.run(f"ip route del {self.server_host}/32", shell=True)
logger.info("Маршрутизация восстановлена")
except Exception as e:
logger.error(f"Ошибка восстановления маршрутизации: {e}")
def show_status(self):
"""Показ статуса VPN подключения"""
print("\n=== Статус Phantom VPN ===")
print(f"Состояние: {'Подключен' if self.running else 'Отключен'}")
if self.running:
print(f"VPN сервер: {self.server_host}:{self.server_port}")
print(f"Локальный IP: {self.local_ip}")
print(f"Интерфейс: {self.interface}")
print(f"Хопов через Phantom: {self.hops}")
# Показ статистики интерфейса
try:
result = subprocess.run(f"ip -s link show {self.interface}",
shell=True, capture_output=True, text=True)
if result.returncode == 0:
print("\nСтатистика интерфейса:")
lines = result.stdout.split('\n')
for line in lines:
if 'RX:' in line or 'TX:' in line or 'bytes' in line:
print(f" {line.strip()}")
except:
pass
# Проверка внешнего IP
try:
import urllib.request
with urllib.request.urlopen('http://httpbin.org/ip', timeout=5) as response:
ip_info = response.read().decode('utf-8')
print(f"\nВнешний IP: {ip_info.strip()}")
except:
print("\nВнешний IP: Не удалось определить")
print("========================\n")
def main():
"""Главная функция"""
parser = argparse.ArgumentParser(
description='Phantom VPN Client - VPN клиент через Phantom сеть'
)
parser.add_argument(
'--server',
required=True,
help='Адрес VPN сервера'
)
parser.add_argument(
'--port',
type=int,
default=1194,
help='Порт VPN сервера (по умолчанию: 1194)'
)
parser.add_argument(
'--interface',
default='tun0',
help='Имя TUN интерфейса (по умолчанию: tun0)'
)
parser.add_argument(
'--hops',
type=int,
default=3,
help='Количество хопов через Phantom сеть (по умолчанию: 3)'
)
parser.add_argument(
'--local-ip',
default='10.8.0.2',
help='Локальный IP адрес (по умолчанию: 10.8.0.2)'
)
parser.add_argument(
'--verbose',
action='store_true',
help='Подробный вывод'
)
parser.add_argument(
'--status',
action='store_true',
help='Показать статус и выйти'
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Создание VPN клиента
vpn_client = PhantomVPNClient(
server_host=args.server,
server_port=args.port,
interface=args.interface,
hops=args.hops
)
vpn_client.local_ip = args.local_ip
if args.status:
vpn_client.show_status()
return
try:
# Запуск VPN клиента
if vpn_client.start():
print("\nPhantom VPN подключен. Нажмите Ctrl+C для отключения")
# Основной цикл
while vpn_client.running:
time.sleep(1)
else:
logger.error("Не удалось запустить VPN клиент")
sys.exit(1)
except KeyboardInterrupt:
logger.info("Получен сигнал прерывания, отключение VPN...")
except Exception as e:
logger.error(f"Критическая ошибка: {e}")
finally:
vpn_client.stop()
if __name__ == '__main__':
main()