Files
paqctl/gfk/server/vio_server.py
SamNet-dev 955e819677 feat: migrate paqctl to self-hosted Gitea (git.samnet.dev)
All repository URLs updated from GitHub (SamNet-dev/paqctl) to
Gitea (git.samnet.dev/SamNet-dev/paqctl). Third-party references
(hanselime/paqet, Xray, microsocks) remain on GitHub.
2026-02-23 23:46:21 -06:00

232 lines
7.1 KiB
Python

from scapy.all import AsyncSniffer,IP,TCP,Raw,conf
import asyncio
import random
import parameters
import logging
import time
# Setup basic logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("VioServer")
vps_ip = parameters.vps_ip
vio_tcp_server_port = parameters.vio_tcp_server_port
vio_udp_server_port = parameters.vio_udp_server_port
quic_local_ip = parameters.quic_local_ip
quic_server_port = parameters.quic_server_port
tcp_flags = getattr(parameters, 'tcp_flags', 'AP')
global client_ip # obtained during sniffing
global client_port # obtained during sniffing
client_ip = "1.1.1.1"
client_port = 443
tcp_options=[
('MSS', 1280), # Maximum Segment Size
('WScale', 8), # Window Scale
('SAckOK', ''), # Selective ACK Permitted
]
async def async_sniff_realtime(qu1):
logger.info("sniffer started")
try:
def process_packet(packet):
# logger.info(f"sniffed before if at {time.time()}")
# Check flags using 'in' to handle different flag orderings (AP vs PA)
flags = str(packet[TCP].flags) if packet.haslayer(TCP) else ''
if packet.haslayer(TCP) and packet[TCP].dport == vio_tcp_server_port and 'A' in flags and 'P' in flags:
data1 = packet[TCP].load
client_ip = packet[IP].src
client_port = packet[TCP].sport
qu1.put_nowait( (data1,client_ip,client_port) )
# logger.info(f"sniffed on tcp : {client_ip} {client_port} at {time.time()}")
async def start_sniffer():
sniffer = AsyncSniffer(prn=process_packet,
filter=f"tcp and dst host {vps_ip} and dst port {vio_tcp_server_port}",
store=False)
sniffer.start()
return sniffer
sniffer = await start_sniffer()
return sniffer
except Exception as e:
logger.info(f"sniff Generic error: {e}....")
async def forward_vio_to_quic(qu1, transport):
global client_ip
global client_port
logger.info(f"Task vio to Quic started")
addr = (quic_local_ip,quic_server_port)
# addr = ("192.168.1.140",quic_server_port)
try:
while True:
# update client_ip, client_port from the queue
data,client_ip,client_port = await qu1.get()
# logger.info(f"data qu1 fetched {data} at {time.time()}")
if(data == None):
break
transport.sendto(data , addr)
# logger.info(f"data sent to udp {data} -> {addr} at {time.time()}")
# qu1.task_done()
except Exception as e:
logger.info(f"Error forwarding vio to Quic: {e}")
finally:
logger.info(f"Task vio to Quic Ended.")
basepkt = IP(src=vps_ip) / TCP(sport=vio_tcp_server_port, seq=1, flags=tcp_flags, ack=0, options=tcp_options) / Raw(load=b"")
skt = conf.L3socket()
def send_to_violated_TCP(binary_data,client_ip,client_port):
# logger.info(f"client ip = {client_ip}")
new_pkt = basepkt.copy()
new_pkt[IP].dst = client_ip
new_pkt[TCP].dport = client_port
new_pkt[TCP].seq = random.randint(1024,1048576)
new_pkt[TCP].ack = random.randint(1024,1048576)
new_pkt[TCP].load = binary_data
skt.send(new_pkt)
async def forward_quic_to_vio(protocol):
logger.info(f"Task QUIC to vio started")
global client_ip
global client_port
try:
while True:
data = await protocol.queue.get() # Wait for data from UDP
if(data == None):
break
send_to_violated_TCP(data,client_ip,client_port)
# logger.info(f"data send to tcp {data} at {time.time()}")
except Exception as e:
logger.info(f"Error forwarding QUIC to vio: {e}")
finally:
logger.info(f"Task QUIC to vio Ended.")
async def start_udp_server(qu1):
while True:
try:
logger.warning(f"violated tcp:{vio_tcp_server_port} -> quic {quic_local_ip}:{quic_server_port} -> ")
loop = asyncio.get_event_loop()
transport, udp_protocol = await loop.create_datagram_endpoint(
lambda: UdpProtocol(),
local_addr=("0.0.0.0", vio_udp_server_port),
remote_addr=(quic_local_ip, quic_server_port)
# remote_addr=("192.168.1.140", quic_server_port)
)
task1 = asyncio.create_task(forward_quic_to_vio(udp_protocol))
task2 = asyncio.create_task(forward_vio_to_quic(qu1,transport))
while True:
await asyncio.sleep(0.02) # this make async loop to switch better between process
if(udp_protocol.has_error):
task1.cancel()
task2.cancel()
await asyncio.sleep(1)
logger.info(f"all task cancelled")
break
except Exception as e:
logger.info(f"vioServer ERR: {e}")
finally:
transport.close()
await asyncio.sleep(0.5)
transport.abort()
logger.info("aborting transport ...")
await asyncio.sleep(1.5)
logger.info("vio inner finished")
class UdpProtocol:
def __init__(self):
self.transport = None
self.has_error = False
self.queue = asyncio.Queue()
def connection_made(self, transport):
logger.info("NEW DGRAM socket created")
logger.info(transport.get_extra_info('socket'))
self.transport = transport
def pause_writing(self):
pass # UDP doesn't need flow control, but we had to implement
def resume_writing(self):
pass # UDP doesn't need flow control, but we had to implement
def datagram_received(self, data, addr):
self.queue.put_nowait(data)
# logger.info(f"data received from udp {data} at {time.time()}")
def error_received(self, exc):
logger.info(f"UDP error received: {exc}")
self.has_error = True
if(self.transport):
self.transport.close()
logger.info("UDP transport closed")
def connection_lost(self, exc):
logger.info(f"UDP lost. {exc}")
self.has_error = True
if(self.transport):
self.transport.close()
logger.info("UDP transport closed")
async def run_vio_server():
sniffer = None
try:
qu1 = asyncio.Queue()
sniffer = await async_sniff_realtime(qu1)
await asyncio.gather(
start_udp_server(qu1),
return_exceptions=True
)
logger.info("end ?")
except SystemExit as e:
logger.info(f"Caught SystemExit: {e}")
except asyncio.CancelledError as e:
logger.info(f"cancelling error: {e}")
except ConnectionError as e:
logger.info(f"Connection error: {e}")
except Exception as e:
logger.info(f"Generic error: {e}")
finally:
if sniffer is not None:
sniffer.stop()
logger.info("stop sniffer")
if __name__ == "__main__":
asyncio.run(run_vio_server())