sudo apt update
sudo apt install -y smcroute
phyint ibs enable ttl 64
phyint docker0 enable ttl 64
mgroup from ibs group 239.192.0.0/24
mroute from ibs group 239.192.0.0/24 to docker0
net.ipv4.conf.all.mc_forwarding = 1
net.ipv4.conf.eth1.mc_forwarding = 1
net.ipv4.conf.ibs.mc_forwarding = 2
net.ipv4.conf.docker0.mc_forwarding = 2
sudo sysctl -p /etc/sysctl.d/99-multicast.conf
#!/bin/bash
#First wait for docker0 interface
while ! ip link show docker0 >/dev/null 2>&1; do
echo "Waiting for docker0 interface..."
sleep 1
done
#Now wait for docker containers to be running
cd /home/sudoku/iec450-test
while ! docker-compose ps | grep -q "Up"; do
echo "Waiting for containers to start..."
docker-compose up -d
sleep 5
done
#Wait a bit more for network setup within containers
sleep 5
#Restart smcroute
systemctl restart smcroute
Make it executable:
sudo chmod +x /usr/local/bin/start-multicast.sh
Create systemd service /etc/systemd/system/start-multicast.service:
[Unit]
Description=Start Multicast Services
After=network-online.target docker.service
Wants=network-online.target docker.service
StartLimitIntervalSec=0
[Service]
Type=oneshot
RemainAfterExit=yes
WorkingDirectory=/home/sudoku/iec450-test
ExecStart=/usr/local/bin/start-multicast.sh
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
Enable services:
sudo systemctl daemon-reload
sudo systemctl enable smcroute
sudo systemctl enable start-multicast
mkdir -p ~/iec450-test/logs
cd ~/iec450-test
Create monitor.py:
#!/usr/bin/env python3
import socket
import struct
import json
import os
import logging
import time
from datetime import datetime
class IEC450Monitor:
"""IEC 61162-450 Monitor"""
def __init__(self):
# Get configuration from environment
self.group = os.environ.get('GROUP', 'MISC')
self.multicast_addr = os.environ.get('MULTICAST_ADDR', '239.192.0.1')
self.port = int(os.environ.get('PORT', '60001'))
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(name)s] %(message)s',
handlers=[
logging.FileHandler(f'/app/logs/{self.group}.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(self.group)
def create_socket(self):
"""Create and configure multicast socket with error handling"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 64)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1)
sock.bind(('', self.port))
group = socket.inet_aton(self.multicast_addr)
iface = socket.inet_aton('0.0.0.0')
mreq = struct.pack('4s4s', group, iface)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self.logger.info(f"Socket created and joined multicast group {self.multicast_addr}")
return sock
except socket.error as e:
self.logger.error(f"Failed to create socket: {e}")
raise
def validate_data(self, data):
"""Validate received data based on group type"""
try:
msg = json.loads(data)
if 'timestamp' not in msg:
return False, "Missing timestamp"
if 'type' not in msg:
return False, "Missing type"
if self.group == 'TGTD':
if 'mmsi' not in msg:
return False, "Missing MMSI"
if 'position' not in msg:
return False, "Missing position"
elif self.group == 'SATD':
if 'heading' not in msg:
return False, "Missing heading"
return True, "Valid"
except json.JSONDecodeError:
return False, "Invalid JSON"
def start_monitoring(self):
"""Start monitoring multicast data with enhanced error handling"""
self.logger.info(f"Starting monitor for {self.group} on {self.multicast_addr}:{self.port}")
retry_count = 0
max_retries = 5
retry_delay = 5
while True:
sock = None
try:
sock = self.create_socket()
retry_count = 0
while True:
try:
data, addr = sock.recvfrom(1024)
data_str = data.decode()
is_valid, message = self.validate_data(data_str)
if is_valid:
self.logger.info(f"Received valid data from {addr}: {data_str}")
retry_count = 0
else:
self.logger.warning(f"Received invalid data from {addr}: {message}")
except socket.timeout:
continue
except socket.error as e:
self.logger.error(f"Socket error while receiving: {e}")
break
except Exception as e:
retry_count += 1
self.logger.error(f"Error in monitoring (attempt {retry_count}/{max_retries}): {e}")
if retry_count >= max_retries:
self.logger.critical(f"Maximum retry attempts ({max_retries}) reached. Exiting.")
raise
self.logger.info(f"Waiting {retry_delay} seconds before retrying...")
time.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60)
finally:
if sock:
try:
sock.close()
except:
pass
def main():
monitor = IEC450Monitor()
monitor.start_monitoring()
if __name__ == '__main__':
main()
Create Dockerfile:
FROM python:3.9-slim
WORKDIR /app
COPY monitor.py .
RUN mkdir -p /app/logs
CMD ["python", "monitor.py"]
Create docker-compose.yml:
version: '3'
services:
monitor-1:
build:
context: .
dockerfile: Dockerfile
container_name: iec450-monitor-1
environment:
- GROUP=TGTD
- MULTICAST_ADDR=239.192.0.2
- PORT=60002
network_mode: bridge
restart: unless-stopped
volumes:
- ./logs:/app/logs
monitor-2:
build:
context: .
dockerfile: Dockerfile
container_name: iec450-monitor-2
environment:
- GROUP=TGTD
- MULTICAST_ADDR=239.192.0.2
- PORT=60002
network_mode: bridge
restart: unless-stopped
volumes:
- ./logs:/app/logs
monitor-3:
build:
context: .
dockerfile: Dockerfile
container_name: iec450-monitor-3
environment:
- GROUP=TGTD
- MULTICAST_ADDR=239.192.0.2
- PORT=60002
network_mode: bridge
restart: unless-stopped
volumes:
- ./logs:/app/logs
Start the application:
cd ~/iec450-test
docker-compose up -d
docker-compose logs -f
ip mroute show
Expected output:
(172.31.1.111,239.192.0.2) Iif: ibs Oifs: docker0 State: resolved
netstat -g
#Check SMCRoute status
systemctl status smcroute
#View SMCRoute logs
journalctl -u smcroute
#Manually restart if needed
systemctl restart smcroute
#Check container status
docker ps -a
#View container logs
docker logs iec450-monitor-1
#Restart containers
docker-compose restart
#Check multicast forwarding settings
sysctl -a | grep mc_forwarding
#Monitor multicast traffic
tcpdump -i any ip multicast
#!/usr/bin/env python3
import socket
import json
import time
import argparse
from datetime import datetime
class IEC450Sender:
"""IEC 61162-450 Data Sender"""
GROUPS = {
'MISC': ('239.192.0.1', 60001),
'TGTD': ('239.192.0.2', 60002),
'SATD': ('239.192.0.3', 60003),
'NAVD': ('239.192.0.4', 60004),
'VDRD': ('239.192.0.5', 60005)
}
def __init__(self):
self.sockets = {}
def find_interface_ip(self):
"""Find IP address in 172.31.x.x network"""
hostname = socket.gethostname()
ip_addresses = socket.gethostbyname_ex(hostname)[2]
print("\nAvailable IP addresses:", ip_addresses)
for ip in ip_addresses:
if ip.startswith('172.31.'):
print(f"Found matching IP: {ip}")
return ip
print("No IP found in 172.31.x.x network")
return None
def create_socket(self, multicast_addr, port):
"""Create and configure multicast socket"""
# Create UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(0.2)
# Set multicast TTL to 64 as per IEC 61162-450 spec
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 64)
# Enable multicast loopback
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1)
# Find interface IP
interface_ip = self.find_interface_ip()
if interface_ip:
# Bind to specific interface
sock.bind((interface_ip, 0))
# Set multicast interface
sock.setsockopt(
socket.IPPROTO_IP,
socket.IP_MULTICAST_IF,
socket.inet_aton(interface_ip)
)
print(f"Socket bound to interface: {interface_ip}")
else:
# Fallback to default interface
sock.bind(('0.0.0.0', 0))
print("Socket bound to default interface")
return sock
def generate_test_data(self, group):
"""Generate test data for specific group"""
timestamp = datetime.now().isoformat()
if group == 'TGTD':
return {
"timestamp": timestamp,
"type": "AIS",
"mmsi": "123456789",
"position": {"lat": 60.123, "lon": 5.123},
"sog": 12.5,
"cog": 180.0
}
elif group == 'SATD':
return {
"timestamp": timestamp,
"type": "HEADING",
"heading": 182.5,
"rate": 0.5
}
return {
"timestamp": timestamp,
"type": "TEST",
"group": group
}
def send_data(self, group, interval=1.0):
"""Send data to specified multicast group"""
if group not in self.GROUPS:
print(f"Unknown group: {group}")
return
multicast_addr, port = self.GROUPS[group]
print(f"\nPreparing to send {group} data to {multicast_addr}:{port}")
sock = self.create_socket(multicast_addr, port)
print(f"Socket created successfully")
sequence = 0
try:
while True:
data = self.generate_test_data(group)
data['sequence'] = sequence
message = json.dumps(data)
sock.sendto(message.encode(), (multicast_addr, port))
print(f"Sent message {sequence} to {group} ({multicast_addr}:{port})")
sequence += 1
time.sleep(interval)
except KeyboardInterrupt:
print(f"\nStopping {group} sender...")
except Exception as e:
print(f"Error sending data: {e}")
finally:
sock.close()
def main():
parser = argparse.ArgumentParser(description='IEC 61162-450 Data Sender')
parser.add_argument('--group',
choices=IEC450Sender.GROUPS.keys(),
required=True,
help='Transmission group')
parser.add_argument('--interval',
type=float,
default=1.0,
help='Send interval in seconds')
args = parser.parse_args()
sender = IEC450Sender()
sender.send_data(args.group, args.interval)
if __name__ == '__main__':
main()
To run execute:
python.exe .\sender.py --group TGTD