A bit about stateful firewalls and intrusion prevention¶
Estimated time to read: 32 minutes
- Originally Written: March, 2024
Overview¶
Sometimes I speak with customers who are under the impression that if they have ACLs (or "stateful" contracts in ACI) they should get rid of their firewalls. This post gives a few example scenarios of where Next Gen Firewalls (NGFWs) can provide additional benefits.
Some of the benefits¶
Stateful Inspection
- Connection state tracking to determine whether to allow or block traffic
Intrusion Prevention
- Detect and prevent known vulnerabilities from being exploited (in this example we are using Snort IPS with a Cisco FTD firewall)
Deep Packet Inspection (DPI)
- Deeper look into the packet payload to identify and control applications and to detect malicious content
Application Awareness and Policies
- Identify and allow or block traffic based on specific applications or application categories
Identity Awareness
- Enforce policies based on user identity by integrating with directory services such as Active Directory
SSL/TLS Inspection
- Decrypt and inspect traffic
Disclaimer
-
The scenarios and example code provided have been built and tested in my own lab, not on a production system or network. Use them at your own risk and always test in a lab environment.
-
Additionally, these are only a few scenarios to demonstrate what's possible and the scripts have been written to generate the desired outcome (e.g. blocking traffic)
Learning with scenarios¶
- 0. Setting up the environment
- 1. Invalid TCP Handshake
- 2. Unexpected Protocol State
- 3. Expired Connection State
- 4. Preventing port scanning attacks
- 5. Application Level Inspection
- 6. DNS Spoofing
0. Setting up the environment¶
- I have the
client.py
script running on one Linux Ubuntu VM with the IP of172.16.101.10
- The
server.py
script is on another VM with the IP of172.16.102.10
- I am using a Cisco virtual FTD firewall configured through Firepower Management Center
- The
G0/0
interface is configured as the default gateway (172.16.101.254
) for the172.16.101.0/24
subnet - The
G0/1
interface is configured as the default gateway (172.16.102.254
) for the172.16.102.0/24
subnet - The client and server VMs have reachability (i.e. they can
ping
each other)
- Running the following command on the client will create a file filled with random data which is used to send to the server for some of the scenarios
1. Invalid TCP handshake¶
This scenario only uses a single script on the client to simulate an invalid handshake. An external client sends a TCP packet with the ACK flag set, but there has been no SYN flag exchange initiating the three-way handshake. The firewall blocks the connection because the first TCP packet seen was not a SYN packet, indicating a potential TCP session hijacking attempt. Also note that the firewall sends a reset (RST) packet to the client with the server IP as the source.
Although this could be a potential malicious packet, you may also see this scenario if there is an asymmetrical flow e.g. the initial SYN and SYN-ACK packets were sent through a different firewall than the one this ACK packet is sent through.
Script¶
client.py
from scapy.all import *
import random
class TCPClient:
def __init__(self, server_ip, server_port):
self.server_ip = server_ip
self.server_port = server_port
self.local_port = random.randint(1024, 65535)
self.client_seq_num = 1000
def initiate_connection(self):
print(f"SENDING ACK PACKET")
ack_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='A', seq=self.client_seq_num, ack=1)
send(ack_packet, verbose=False)
if __name__ == "__main__":
client = TCPClient(server_ip="172.16.102.10", server_port=12345)
client.initiate_connection()
Verify¶
Packet capture on the firewall showing the dropped packet and the reset
Packet capture on the client showing the ACK and the RST
Packet capture on the server showing that no packets were received
2. Unexpected protocol state¶
A client sends a FIN packet to terminate a TCP session, but then it sends an additional data packet after the FIN. The firewall blocks the subsequent data packet as the session state indicates that the connection should be closed.
Intrusion policy required
It looks like this scenario requires the instrusion policy configured as it's the Snort IPS which catches the STREAM5_DATA_ON_CLOSED
event using the Stream preprocessor. The example script below only closes the client side of the connection so it still exists within the connection table. Without the intrusion policy set you may still be able to send subsequent packets.
In these example scenarios I've set it to Maximum Detection
.
The server script is the same as the one found in the Simple TCP Setup with the Python Scapy Library post.
The client script has been slightly altered to send a single packet after the TCP session is closed (FIN-ACK/ACK). This can be found in the terminate_connection(self,payload)
function.
Script¶
client.py
from scapy.all import *
import random
class TCPClient:
def __init__(self, server_ip, server_port, chunk_size=512):
self.server_ip = server_ip
self.server_port = server_port
self.local_port = random.randint(1024, 65535)
self.client_seq_num = 1000
self.client_ack_num = None
self.server_seq_num = None
self.chunk_size = 1400
def initiate_connection(self):
print(f"SENDING SYN PACKET")
syn_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='S', seq=self.client_seq_num)
syn_ack_response = sr1(syn_packet, verbose=False)
self.client_seq_num += 1
print(f"RECEIVING SYN ACK")
if syn_ack_response and syn_ack_response.haslayer(TCP) and syn_ack_response[TCP].flags == 'SA': # SYN-ACK flags
self.server_seq_num = syn_ack_response[TCP].seq
self.client_ack_num = self.server_seq_num + 1
ack_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='A', seq=self.client_seq_num, ack=self.server_seq_num + 1)
send(ack_packet, verbose=False)
print(f"SENDING ACK TO SYN-ACK")
print(f"THREE-WAY HANDSHAKE COMPLETE")
return True
return False
def send_payload(self, payload):
if not self.initiate_connection():
print("FAILED TO ESTABLISH A CONNECTION WITH THE SERVER")
return
bytes_sent = 0
current_packet_index = 0
total_packets = int(len(payload)/self.chunk_size)
while bytes_sent < len(payload):
print(f"SENDING PACKET {current_packet_index + 1} OF {total_packets}")
print(f"TOTAL BYTES SENT: {bytes_sent} PAYLOAD {len(payload)}")
chunk = payload[bytes_sent:bytes_sent + self.chunk_size]
psh_ack_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='PA', seq=self.client_seq_num, ack=self.client_ack_num) / Raw(load=chunk)
ack_response = sr1(psh_ack_packet, verbose=False)
print(f"UPDATING MY SEQUENCE NUMBER")
self.client_seq_num += len(chunk)
print(f"RECEIVING AN ACK FOR THIS CHUNK OF DATA")
if ack_response and ack_response.haslayer(TCP) and ack_response[TCP].flags == 'A': # ACK flag
bytes_sent += len(chunk)
current_packet_index += 1
else:
print("FAILED TO SEND PAYLOAD")
return
print("PAYLOAD SENT SUCCESSFULLY")
def terminate_connection(self,payload):
print(f"SENDING CLIENT FIN")
fin_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='FA', seq=self.client_seq_num, ack=self.client_ack_num)
fin_ack_response = sr1(fin_packet, verbose=False)
if fin_ack_response and fin_ack_response.haslayer(TCP) and fin_ack_response[TCP].flags == 'A':
print(f"RECEIVED AN ACK FOR CLIENT FIN: MY CONNECTION IS CLOSED")
print("SENDING ONE MORE DATA PACKET")
chunk = payload[0:0 + self.chunk_size]
psh_ack_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='PA', seq=self.client_seq_num, ack=self.client_ack_num) / Raw(load=chunk)
send(psh_ack_packet, verbose=0)
def send_file(self, file_path):
with open(file_path, 'rb') as f:
file_data = f.read()
self.send_payload(file_data)
self.terminate_connection(file_data)
if __name__ == "__main__":
client = TCPClient(server_ip="172.16.102.10", server_port=12345)
client.send_file("file.txt")
server.py
from scapy.all import *
import time
class TCPServer:
def __init__(self, host='0.0.0.0', port=12345):
self.host = host
self.port = port
self.server_seq_num = 5000
self.client_seq_num = None
self.client_ack_num = None
def start(self):
print(f"STARTING TCP SERVER ON {self.host}:{self.port}")
sniff(iface="ens224",filter=f"tcp and dst port {self.port}", prn=self.handle_packet, store=0)
def handle_packet(self, packet):
if packet.haslayer(IP) and packet.haslayer(TCP):
self.client_seq_num = packet[TCP].seq
self.client_ack_num = packet[TCP].ack
if packet[TCP].flags == 'S': # SYN flag
print(f"RECEIVED A SYN")
print(f"SENDING A SYN-ACK")
syn_ack_packet = IP(dst=packet[IP].src) / TCP(sport=self.port, dport=packet[TCP].sport, flags='SA', seq=self.server_seq_num, ack=self.client_seq_num + 1)
send(syn_ack_packet, verbose=0)
print(f"UPDATING MY SEQUENCE NUMBER")
self.server_seq_num += 1
elif packet[TCP].flags == 'A': # ACK flag
print(f"RECEIVED AN ACK FOR SYN-ACK")
print(f"THREE-WAY HANDSHAKE COMPLETE")
elif packet[TCP].flags == 'PA': # PSH and ACK flags
print(f"RECEIVED SOME DATA: {packet[TCP].payload.load.decode()}")
print(f"SENDING AN ACK FOR THIS CHUNK OF DATA")
ack_packet = IP(dst=packet[IP].src) / TCP(sport=self.port, dport=packet[TCP].sport, flags='A', seq=self.server_seq_num, ack=self.client_seq_num + len(packet[TCP].payload.load))
send(ack_packet, verbose=0)
elif packet[TCP].flags == 'FA': # FIN flag
print(f"RECEIVED A FIN FROM CLIENT")
print(f"SENDING ACK TO FIN-ACK ")
fin_packet = IP(dst=packet[IP].src) / TCP(sport=packet[TCP].dport, dport=packet[TCP].sport, flags='A', seq=self.server_seq_num, ack=self.client_seq_num + 1)
send(fin_packet, verbose=0)
# print(f"UPDATING MY SEQUENCE NUMBER")
# self.server_seq_num += 1
print(f"SENDING SERVER FIN")
time.sleep(1)
fin_packet = IP(dst=packet[IP].src) / TCP(sport=packet[TCP].dport, dport=packet[TCP].sport, flags='FA', seq=self.server_seq_num, ack=self.client_seq_num + 1 )
fin_ack_response = sr1(fin_packet, verbose=False)
if fin_ack_response and fin_ack_response.haslayer(TCP) and fin_ack_response[TCP].flags == 'A': # FIN-ACK flags
print(f"RECEIVED AN ACK FOR SERVER FIN: MY CONNECTION IS CLOSED")
print(f"EXITING")
exit()
if __name__ == "__main__":
server = TCPServer()
server.start()
Verify¶
The packet capture on the firewall showing the dropped final packet
The connection table showing the intrusion block
More details on why the packet was dropped
The Snort rule documentation
3. Expired connection state¶
A client was connected to the server but has been idle for longer than the configured connection timeout. The firewall drops the new packet because the connection state has expired due to timeout, and it now considers the session as new, which requires a fresh handshake. You may run into this with long running TCP connections.
The default idle connection timeout on the FTD is 1 hour. To speed up this scenario I reduced the connection timeout to 5 seconds.
FMC configuration¶
Navigate to the Access Policies
Edit the policy, in this case I just have an example default policy
Switch to the advanced tab
Edit the Threat Defence Service Policy
Add a new rule. I'm just showing the example I have configured. It applies to all interfaces (global) and the extended access list applies to any source and any destination
I've reduced the Idle Connection Timeout to 5 seconds for this scenario
The server script is the same as the one found in the Simple TCP Setup with the Python Scapy Library post.
The client script has been slightly altered and a time.sleep(30)
line has been added. This provides enough time for the connection to become idle and should result in FTD dropping subsequent packets.
Script¶
client.py
from scapy.all import *
import random
import time
class TCPClient:
def __init__(self, server_ip, server_port, chunk_size=512):
self.server_ip = server_ip
self.server_port = server_port
self.local_port = random.randint(1024, 65535)
self.client_seq_num = 1000
self.client_ack_num = None
self.server_seq_num = None
self.chunk_size = 1400
def initiate_connection(self):
print(f"SENDING SYN PACKET")
syn_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='S', seq=self.client_seq_num)
syn_ack_response = sr1(syn_packet, verbose=False)
self.client_seq_num += 1
print(f"RECEIVING SYN ACK")
if syn_ack_response and syn_ack_response.haslayer(TCP) and syn_ack_response[TCP].flags == 'SA': # SYN-ACK flags
self.server_seq_num = syn_ack_response[TCP].seq
self.client_ack_num = self.server_seq_num + 1
ack_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='A', seq=self.client_seq_num, ack=self.server_seq_num + 1)
send(ack_packet, verbose=False)
print(f"SENDING ACK TO SYN-ACK")
print(f"THREE-WAY HANDSHAKE COMPLETE")
return True
return False
def send_payload(self, payload):
if not self.initiate_connection():
print("FAILED TO ESTABLISH A CONNECTION WITH THE SERVER")
return
print("SLEEPING FOR 30 SECONDS")
time.sleep(30)
print("WAKING UP AND SENDING SOME DATA")
bytes_sent = 0
current_packet_index = 0
total_packets = int(len(payload)/self.chunk_size)
while bytes_sent < len(payload):
print(f"SENDING PACKET {current_packet_index + 1} OF {total_packets}")
print(f"TOTAL BYTES SENT: {bytes_sent} PAYLOAD {len(payload)}")
chunk = payload[bytes_sent:bytes_sent + self.chunk_size]
psh_ack_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='PA', seq=self.client_seq_num, ack=self.client_ack_num) / Raw(load=chunk)
ack_response = sr1(psh_ack_packet, verbose=False)
print(f"UPDATING MY SEQUENCE NUMBER")
self.client_seq_num += len(chunk)
print(f"RECEIVING AN ACK FOR THIS CHUNK OF DATA")
if ack_response and ack_response.haslayer(TCP) and ack_response[TCP].flags == 'A': # ACK flag
bytes_sent += len(chunk)
current_packet_index += 1
else:
print("FAILED TO SEND PAYLOAD")
return
print("PAYLOAD SENT SUCCESSFULLY")
def terminate_connection(self):
# AsyncSniffer(iface="ens224",filter=f"tcp and dst port {self.local_port}", prn=self.handle_packet, store=0)
print(f"SENDING CLIENT FIN")
fin_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='FA', seq=self.client_seq_num, ack=self.client_ack_num)
fin_ack_response = sr1(fin_packet, verbose=False)
if fin_ack_response and fin_ack_response.haslayer(TCP) and fin_ack_response[TCP].flags == 'A':
print(f"RECEIVED AN ACK FOR CLIENT FIN: MY CONNECTION IS CLOSED")
print("STILL LISTENING FOR A SERVER FIN")
sniff(iface="ens224",filter=f"tcp and dst port {self.local_port}", prn=self.handle_packet, store=0)
def handle_packet(self, packet):
if packet.haslayer(IP) and packet.haslayer(TCP) and packet[TCP].flags == 'FA':
print(f"RECEIVED A FIN FROM SERVER")
print(f"SENDING ACK TO FIN-ACK ")
self.server_seq_num = packet[TCP].seq
fin_packet = IP(dst=packet[IP].src) / TCP(sport=packet[TCP].dport, dport=packet[TCP].sport, flags='A', seq=self.client_seq_num + 1, ack=self.server_seq_num + 1)
send(fin_packet, verbose=0)
print(f"EXITING")
exit()
def send_file(self, file_path):
with open(file_path, 'rb') as f:
file_data = f.read()
self.send_payload(file_data)
self.terminate_connection()
if __name__ == "__main__":
client = TCPClient(server_ip="172.16.102.10", server_port=12345)
client.send_file("file.txt")
server.py
from scapy.all import *
import time
class TCPServer:
def __init__(self, host='0.0.0.0', port=12345):
self.host = host
self.port = port
self.server_seq_num = 5000
self.client_seq_num = None
self.client_ack_num = None
def start(self):
print(f"STARTING TCP SERVER ON {self.host}:{self.port}")
sniff(iface="ens224",filter=f"tcp and dst port {self.port}", prn=self.handle_packet, store=0)
def handle_packet(self, packet):
if packet.haslayer(IP) and packet.haslayer(TCP):
self.client_seq_num = packet[TCP].seq
self.client_ack_num = packet[TCP].ack
if packet[TCP].flags == 'S': # SYN flag
print(f"RECEIVED A SYN")
print(f"SENDING A SYN-ACK")
syn_ack_packet = IP(dst=packet[IP].src) / TCP(sport=self.port, dport=packet[TCP].sport, flags='SA', seq=self.server_seq_num, ack=self.client_seq_num + 1)
send(syn_ack_packet, verbose=0)
print(f"UPDATING MY SEQUENCE NUMBER")
self.server_seq_num += 1
elif packet[TCP].flags == 'A': # ACK flag
print(f"RECEIVED AN ACK FOR SYN-ACK")
print(f"THREE-WAY HANDSHAKE COMPLETE")
elif packet[TCP].flags == 'PA': # PSH and ACK flags
print(f"RECEIVED SOME DATA: {packet[TCP].payload.load.decode()}")
print(f"SENDING AN ACK FOR THIS CHUNK OF DATA")
ack_packet = IP(dst=packet[IP].src) / TCP(sport=self.port, dport=packet[TCP].sport, flags='A', seq=self.server_seq_num, ack=self.client_seq_num + len(packet[TCP].payload.load))
send(ack_packet, verbose=0)
elif packet[TCP].flags == 'FA': # FIN flag
print(f"RECEIVED A FIN FROM CLIENT")
print(f"SENDING ACK TO FIN-ACK ")
fin_packet = IP(dst=packet[IP].src) / TCP(sport=packet[TCP].dport, dport=packet[TCP].sport, flags='A', seq=self.server_seq_num, ack=self.client_seq_num + 1)
send(fin_packet, verbose=0)
# print(f"UPDATING MY SEQUENCE NUMBER")
# self.server_seq_num += 1
print(f"SENDING SERVER FIN")
time.sleep(1)
fin_packet = IP(dst=packet[IP].src) / TCP(sport=packet[TCP].dport, dport=packet[TCP].sport, flags='FA', seq=self.server_seq_num, ack=self.client_seq_num + 1 )
fin_ack_response = sr1(fin_packet, verbose=False)
if fin_ack_response and fin_ack_response.haslayer(TCP) and fin_ack_response[TCP].flags == 'A': # FIN-ACK flags
print(f"RECEIVED AN ACK FOR SERVER FIN: MY CONNECTION IS CLOSED")
print(f"EXITING")
exit()
if __name__ == "__main__":
server = TCPServer()
server.start()
Verify¶
Packet capture on the firewall showing the dropped packets after the connection timeout
Packet capture on the client VM showing RSTs received from the firewall
4. Preventing port scanning attacks¶
Disclaimer
I ran this scenario in my lab with nmap
only scanning a subnet that I own. I would advise against running this on any network you don't manage or outside of a lab environment.
This scenario doesn't use a client or server script to send traffic, instead it looks at how an NGFW can detect and block port scanning.
Script¶
I ran the following nmap
command from the client VM to scan the server subnet.
-sS
: This option performs a SYN scan, which is less intrusive and considered "stealthy"-p:
Specify ports or port ranges to scan (e.g., -p 1-65535 to scan all ports)-T4:
This option sets the timing template to "aggressive" which speeds up the scan-A:
Enables OS detection, version detection, script scanning, and traceroute
Navigate to the Access Policies
Edit the policy, in this case I just have an example default policy
Switch to the Advanced tab
Edit the Threat Detection policy
For this example I put the portscan into prevention rather than detection
You can configure various sensititvity levels
I also put the IPS policy as maximum detection level for this example
Verify¶
Connection events showing the intrusion blocks for the port scans
The specific events detected
FTD uses Snort to monitor network traffic for suspicious activity. When nmap
scans it sends various scripts and probes to the target network to gather information. These activities can trigger intrusion events in FMC because they may resemble the techniques used by malicious actors to probe or attack a network.
Here are explanations for some of the events above:
-
STREAM5_BAD_RST:
This indicates that Snort has seen a TCP RST packet that doesn't appear to be part of any established TCP connection in the network's traffic -
PROTOCOL-ICMP Unusual PING detected
: Nmap uses different types of ICMP echo requests to determine whether a host is up, for example pings with unusual payloads or sizes -
DECODE_ICMP_PING_NMAP
: This is a very specific alert to Nmap's ICMP echo requests -
PROTOCOL-SNMP request tcp
: Nmap can perform scans on the SNMP service over TCP which can trigger this alert -
INDICATOR-SHELLCODE x86 inc ebx NOOP
: This alert indicates that the Snort has detected a sequence of bytes that looks like a shellcode NOP (no-operation) sled, which is commonly used in buffer overflow attacks to provide a "landing zone" for the instruction pointer. This can be triggered by Nmap's OS detection feature.
5. Application level inspection¶
In this scenario the client sends malicious data in the HTTP post which can cause issues when user input is not properly sanitized and the input is used directly in an SQL query.
The payload, "username=admin&password=anything' OR '1'='1"
, in the example below is a part of an HTTP query string sent to the server.
When this input is improperly handled and concatenated directly into an SQL command, the resulting SQL query might look something like this:
SELECT * FROM users WHERE username = 'admin' AND password = 'anything' OR '1'='1';
Note the ' OR '1'='1
at the end of the payload. This is the injected part of the SQL statement. Since '1'='1'
is a logical expression that is always true, it effectively negates the need for the username and password conditions to be true.
The result of this injection is that the SQL statement now has a condition that is always true ('1'='1'
), which means the WHERE clause will always evaluate to true, and the SQL query will return all rows from the users table. In the context of a login system, this could allow an attacker to bypass authentication checks completely and potentially gain unauthorized access to the application.
For another example see Little Bobby Tables.
The server script is the same as the one found in the Simple TCP Setup with the Python Scapy Library post.
The client script has been altered and rather than sending file from a data, an HTTP packet is created with the malicious payload.
Script¶
client.py
from scapy.all import *
import random
class TCPClient:
def __init__(self, server_ip, server_port, chunk_size=512):
self.server_ip = server_ip
self.server_port = server_port
self.local_port = random.randint(1024, 65535)
self.client_seq_num = 1000
self.client_ack_num = None
self.server_seq_num = None
self.chunk_size = 1400
def initiate_connection(self):
print(f"SENDING SYN PACKET")
syn_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='S', seq=self.client_seq_num)
syn_ack_response = sr1(syn_packet, verbose=False)
self.client_seq_num += 1
print(f"RECEIVING SYN ACK")
if syn_ack_response and syn_ack_response.haslayer(TCP) and syn_ack_response[TCP].flags == 'SA': # SYN-ACK flags
self.server_seq_num = syn_ack_response[TCP].seq
self.client_ack_num = self.server_seq_num + 1
ack_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='A', seq=self.client_seq_num, ack=self.server_seq_num + 1)
send(ack_packet, verbose=False)
print(f"SENDING ACK TO SYN-ACK")
print(f"THREE-WAY HANDSHAKE COMPLETE")
return True
return False
def send_payload(self):
if not self.initiate_connection():
print("FAILED TO ESTABLISH A CONNECTION WITH THE SERVER")
return
sql_injection_payload = "username=admin&password=anything' OR '1'='1"
# The HTTP request with an SQL injection attempt in the body
http_request = f"POST /submit_form HTTP/1.1\r\n"
http_request += f"Host: {self.server_ip}\r\n"
http_request += "Content-Type: application/x-www-form-urlencoded\r\n"
http_request += f"Content-Length: {len(sql_injection_payload)}\r\n"
http_request += "Connection: keep-alive\r\n\r\n" # End of HTTP headers
http_request += sql_injection_payload # The body of the request containing the injection attempt
print(f"SENDING HTTP REQUEST WITH PAYLOAD")
psh_ack_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='PA', seq=self.client_seq_num, ack=self.client_ack_num) / Raw(load=http_request)
ack_response = sr1(psh_ack_packet, verbose=False)
self.client_seq_num += len(http_request.encode('utf-8'))
if ack_response and ack_response.haslayer(TCP) and ack_response[TCP].flags == 'A': # ACK flag
print("HTTP REQUEST SENT SUCCESSFULLY")
else:
print("FAILED TO SEND HTTP REQUEST")
def terminate_connection(self):
print(f"SENDING CLIENT FIN")
fin_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='FA', seq=self.client_seq_num, ack=self.client_ack_num)
fin_ack_response = sr1(fin_packet, verbose=False)
if fin_ack_response and fin_ack_response.haslayer(TCP) and fin_ack_response[TCP].flags == 'A':
print(f"RECEIVED AN ACK FOR CLIENT FIN: MY CONNECTION IS CLOSED")
print("STILL LISTENING FOR A SERVER FIN")
sniff(iface="ens224",filter=f"tcp and dst port {self.local_port}", prn=self.handle_packet, store=0)
def handle_packet(self, packet):
if packet.haslayer(IP) and packet.haslayer(TCP) and packet[TCP].flags == 'FA':
print(f"RECEIVED A FIN FROM SERVER")
print(f"SENDING ACK TO FIN-ACK ")
self.server_seq_num = packet[TCP].seq
fin_packet = IP(dst=packet[IP].src) / TCP(sport=packet[TCP].dport, dport=packet[TCP].sport, flags='A', seq=self.client_seq_num + 1, ack=self.server_seq_num + 1)
send(fin_packet, verbose=0)
print(f"EXITING")
exit()
if __name__ == "__main__":
# Example SQL injection payload
client = TCPClient(server_ip="172.16.102.10", server_port=12345) # Assuming the HTTP server is on port 80
client.send_payload()
client.terminate_connection()
server.py
from scapy.all import *
import random
class TCPClient:
def __init__(self, server_ip, server_port, chunk_size=512):
self.server_ip = server_ip
self.server_port = server_port
self.local_port = random.randint(1024, 65535)
self.client_seq_num = 1000
self.client_ack_num = None
self.server_seq_num = None
self.chunk_size = 1400
def initiate_connection(self):
print(f"SENDING SYN PACKET")
syn_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='S', seq=self.client_seq_num)
syn_ack_response = sr1(syn_packet, verbose=False)
self.client_seq_num += 1
print(f"RECEIVING SYN ACK")
if syn_ack_response and syn_ack_response.haslayer(TCP) and syn_ack_response[TCP].flags == 'SA': # SYN-ACK flags
self.server_seq_num = syn_ack_response[TCP].seq
self.client_ack_num = self.server_seq_num + 1
ack_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='A', seq=self.client_seq_num, ack=self.server_seq_num + 1)
send(ack_packet, verbose=False)
print(f"SENDING ACK TO SYN-ACK")
print(f"THREE-WAY HANDSHAKE COMPLETE")
return True
return False
def send_payload(self, payload):
if not self.initiate_connection():
print("FAILED TO ESTABLISH A CONNECTION WITH THE SERVER")
return
# Craft the HTTP request with an SQL injection attempt in the body
http_request = f"POST /submit_form HTTP/1.1\r\n"
http_request += f"Host: {self.server_ip}\r\n"
http_request += "Content-Type: application/x-www-form-urlencoded\r\n"
http_request += f"Content-Length: {len(payload)}\r\n"
http_request += "Connection: keep-alive\r\n\r\n" # End of HTTP headers
http_request += payload # The body of the request containing the injection attempt
print(f"SENDING HTTP REQUEST WITH PAYLOAD")
psh_ack_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='PA', seq=self.client_seq_num, ack=self.client_ack_num) / Raw(load=http_request)
ack_response = sr1(psh_ack_packet, verbose=False)
self.client_seq_num += len(http_request.encode('utf-8'))
if ack_response and ack_response.haslayer(TCP) and ack_response[TCP].flags == 'A': # ACK flag
print("HTTP REQUEST SENT SUCCESSFULLY")
else:
print("FAILED TO SEND HTTP REQUEST")
def terminate_connection(self):
# AsyncSniffer(iface="ens224",filter=f"tcp and dst port {self.local_port}", prn=self.handle_packet, store=0)
print(f"SENDING CLIENT FIN")
fin_packet = IP(dst=self.server_ip) / TCP(sport=self.local_port, dport=self.server_port, flags='FA', seq=self.client_seq_num, ack=self.client_ack_num)
fin_ack_response = sr1(fin_packet, verbose=False)
if fin_ack_response and fin_ack_response.haslayer(TCP) and fin_ack_response[TCP].flags == 'A':
print(f"RECEIVED AN ACK FOR CLIENT FIN: MY CONNECTION IS CLOSED")
print("STILL LISTENING FOR A SERVER FIN")
sniff(iface="ens224",filter=f"tcp and dst port {self.local_port}", prn=self.handle_packet, store=0)
def handle_packet(self, packet):
if packet.haslayer(IP) and packet.haslayer(TCP) and packet[TCP].flags == 'FA':
print(f"RECEIVED A FIN FROM SERVER")
print(f"SENDING ACK TO FIN-ACK ")
self.server_seq_num = packet[TCP].seq
fin_packet = IP(dst=packet[IP].src) / TCP(sport=packet[TCP].dport, dport=packet[TCP].sport, flags='A', seq=self.client_seq_num + 1, ack=self.server_seq_num + 1)
send(fin_packet, verbose=0)
print(f"EXITING")
exit()
def send_file(self, file_path):
with open(file_path, 'rb') as f:
file_data = f.read()
self.send_payload(file_data)
self.terminate_connection()
if __name__ == "__main__":
# Example SQL injection payload
sql_injection_payload = "username=admin&password=anything' OR '1'='1"
client = TCPClient(server_ip="172.16.102.10", server_port=12345) # Assuming the HTTP server is on port 80
client.send_payload(sql_injection_payload)
client.terminate_connection()
Verify¶
The connection table showing the blocked connection
The intrusion events triggered by this flow. Note that the IPS has also identified another potential issue since our payload is also sending the user credentials.
How is the SQL injection attack identified?¶
In this scenario the FTD NGFW is using the Snort IPS to detect the attack. The rules can be found on the Intrusion Rules page.
This is the specific rule used in scenario and an explanation of the components.
alert tcp $EXTERNAL_NET any -> $HOME_NET $HTTP_PORTS ( msg:"SQL 1 = 1 - possible sql injection attempt"; flow:to_server,established; http_client_body; content:"1=1",fast_pattern,nocase; pcre:"/or[\s\x2f\x2A]+1=1/i"; metadata:policy balanced-ips drop,policy max-detect-ips drop,policy security-ips drop; service:http; reference:url,attack.mitre.org/techniques/T1190; reference:url,ferruh.mavituna.com/sql-injection-cheatsheet-oku/; classtype:web-application-attack; sid:27287; rev:5; )
alert tcp
- Tells Snort to generate an alert for TCP traffic.
-
$EXTERNAL_NET any -> $HOME_NET $HTTP_PORTS
- The rule applies to traffic from any port on external networks ($EXTERNAL_NET any) directed towards HTTP ports on the home network ($HOME_NET $HTTP_PORTS). The variables $EXTERNAL_NET, $HOME_NET, and $HTTPS_PORTS are predefined in the Snort configuration file. In my lab $EXTERNAL_NET and $HOME_NET are defined as
any
network
- The rule applies to traffic from any port on external networks ($EXTERNAL_NET any) directed towards HTTP ports on the home network ($HOME_NET $HTTP_PORTS). The variables $EXTERNAL_NET, $HOME_NET, and $HTTPS_PORTS are predefined in the Snort configuration file. In my lab $EXTERNAL_NET and $HOME_NET are defined as
-
msg:"SQL 1 = 1 - possible sql injection attempt"
- This part of the rule specifies the message that will be logged when the rule is triggered
flow:to_server,established
- Checks the direction and state of the traffic. Here it specifies that the rule should only match traffic that is flowing to the server (not from the server) and is part of an already established connection
http_client_body
- Tells Snort to search for the content in the body of the HTTP client request, not in the headers
content:"1=1",fast_pattern,nocase
- This part of the rule looks for the content
"1=1"
within the HTTP client body, which is a common pattern in SQL injection attacks. Thefast_pattern
modifier helps Snort to use this string for fast pattern matching, andnocase
makes the search case-insensitive
- This part of the rule looks for the content
pcre:"/or[\s\x2f\x2A]+1=1/i";
- The
pcre
keyword allows the rule to use Perl-compatible regular expressions. - What this specific regex pattern is looking for:
or
: This matches the literal string "or" which is often used in SQL queries to combine conditions -[\s\x2f\x2A]+
: This is a character class that matches:\s
: Any whitespace character (e.g., spaces, tabs, newlines)\x2f
: The hexadecimal representation of the forward slash character\x2A
: The hexadecimal representation of the asterisk character+
: The regex will match one or more occurrences of the characters in the class
1=1
: Matches the literal string "1=1", which is always true and often used in SQL injection to manipulate the logic of a SQL WHERE clause./i
: This is a modifier that makes the regex search case-insensitive. It means that the pattern will match regardless of whether the characters are upper or lower case (e.g., "OR", "or", "Or", "oR").
- The
metadata:policy balanced-ips drop,policy max-detect-ips drop,policy security-ips drop;
- The metadata field provides additional information for Snort’s processing engine. In this case, it suggests that the policy action should be to drop the packets
service:http;
- Specifies that the rule applies to HTTP services
reference:url,attack.mitre.org/techniques/T1190;
- Provides a reference URL to MITRE ATT&CK's technique T1190, which details common tactics for exploitation of public-facing applications
reference:url,ferruh.mavituna.com/sql-injection-cheatsheet-oku/;
- Another reference URL but this seems to no longer have the cheatsheet. You can use the Wayback Machine to find an older copy
classtype:web-application-attack;
- This assigns the alert to the "web-application-attack" classification type, which is a category of attack defined in Snort's configuration
sid:27287;
- The Snort ID (SID) for this specific rule
rev:5;
- Indicates the revision number of the rule and shows that this rule has been updated or modified five times
6. DNS spoofing¶
This scenario is a little hard to simulate but here's an overview in case you're interested. In this example a malicious actor responds to a DNS request with a forged response. The idea is to divert this traffic to a malicious IP address that the actor owns.
How a stateful firewall can help:
- Request-Response Tracking
- The firewall tracks outbound DNS requests from hosts within the network and logs the transaction IDs and source ports used in those requests.
- Validation of Responses
- When a DNS response comes in, the firewall checks its state table to validate that the response has a matching transaction ID and source port as the request. If there's no match, the response is likely unsolicited and could be part of an attack.
- Timing Checks
- The firewall can also check the timing of the response. If it comes suspiciously quickly or after an unusual delay, the firewall may flag the response as potentially malicious, especially if the response arrives after the legitimate DNS server's response.
- Response Source Verification
- The firewall can ensure that the source IP of the DNS response matches the IP of the DNS server to which the request was sent. If it doesn't match, the firewall can block the response.
- Rate Limiting
- The firewall can implement rate limiting for DNS responses to prevent a flood of fake DNS responses from overwhelming the legitimate responses.