Why Build Your Own Tools?

Nmap exists. Nessus exists. Metasploit exists. So why spend weeks building a network security toolkit from scratch?

Three reasons. First, understanding. Using a tool and understanding how it works are fundamentally different. Writing a port scanner from raw sockets teaches you TCP/IP in a way that no textbook or pre-built tool can. Second, customization. Professional tools are general-purpose. A custom toolkit can be tailored to specific assessment workflows, output formats, and organizational requirements. Third, education. As a cybersecurity student, building these tools demonstrates a depth of knowledge that running someone else's scripts does not.

The NetSec Toolkit I built covers the four phases of a basic network assessment: port scanning, banner grabbing and service detection, vulnerability checking, and report generation.

Port Scanning with Python Sockets

At its core, a port scanner is simple: try to connect to every port on a target host and see which ones respond. A TCP connect scan establishes a full three-way handshake (SYN, SYN-ACK, ACK) with each port. If the connection succeeds, the port is open. If the connection is refused, the port is closed. If there is no response, the port is filtered.

import socket
from dataclasses import dataclass


@dataclass
class PortResult:
    port: int
    state: str       # 'open', 'closed', 'filtered'
    service: str     # e.g., 'http', 'ssh', 'unknown'
    banner: str


def scan_port(host: str, port: int, timeout: float = 1.0) -> PortResult:
    """Scan a single port using TCP connect."""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)

    try:
        result = sock.connect_ex((host, port))
        if result == 0:
            # Port is open — attempt banner grab
            service = socket.getservbyport(port, 'tcp') if port < 1024 else 'unknown'
            banner = grab_banner(sock, port)
            return PortResult(port, 'open', service, banner)
        else:
            return PortResult(port, 'closed', '', '')
    except socket.timeout:
        return PortResult(port, 'filtered', '', '')
    finally:
        sock.close()

The naive approach—scanning ports sequentially—is painfully slow. Scanning 65,535 ports with a 1-second timeout would take over 18 hours. Threading is essential.

Threading for Performance

Port scanning is I/O-bound, not CPU-bound. Each scan spends most of its time waiting for a network response. This makes it ideal for threading. Python's concurrent.futures module provides a clean interface for managing a pool of worker threads:

from concurrent.futures import ThreadPoolExecutor, as_completed


def scan_host(
    host: str,
    ports: range = range(1, 1025),
    max_threads: int = 100,
    timeout: float = 1.0
) -> list[PortResult]:
    """Scan multiple ports concurrently using a thread pool."""
    results = []

    with ThreadPoolExecutor(max_workers=max_threads) as executor:
        futures = {
            executor.submit(scan_port, host, port, timeout): port
            for port in ports
        }

        for future in as_completed(futures):
            result = future.result()
            if result.state == 'open':
                results.append(result)
                print(f'  [OPEN] {result.port:>5}/tcp  {result.service:<16} {result.banner}')

    return sorted(results, key=lambda r: r.port)

With 100 threads and a 1-second timeout, scanning the first 1,024 ports takes roughly 10-15 seconds instead of 17 minutes. For a full 65,535-port scan, I increase the thread count to 200 and reduce the timeout to 0.5 seconds, bringing the total scan time to under two minutes on a local network.

Banner Grabbing and Service Detection

Knowing a port is open is only the first step. The real question is: what is running on it? Banner grabbing sends a probe to the open port and reads the response. Many services announce themselves in their initial response:

def grab_banner(sock: socket.socket, port: int) -> str:
    """Attempt to grab the service banner from an open port."""
    try:
        # Some services send a banner immediately upon connection
        sock.settimeout(2.0)

        # HTTP requires sending a request first
        if port in (80, 8080, 8443, 443):
            sock.send(b'HEAD / HTTP/1.1\r\nHost: target\r\n\r\n')
        elif port == 25:
            pass  # SMTP sends banner on connect
        elif port in (21,):
            pass  # FTP sends banner on connect
        else:
            # Generic probe: send a newline and see what comes back
            sock.send(b'\r\n')

        banner = sock.recv(1024).decode('utf-8', errors='replace').strip()
        return banner[:120]  # Truncate long banners
    except (socket.timeout, ConnectionError, OSError):
        return ''


def identify_service(port: int, banner: str) -> dict:
    """Parse the banner to identify the service and version."""
    info = {'service': 'unknown', 'version': '', 'os_hint': ''}

    if 'SSH' in banner:
        info['service'] = 'ssh'
        # e.g., "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.6"
        parts = banner.split()
        if parts:
            info['version'] = parts[0].split('-')[-1] if '-' in parts[0] else parts[0]
        if 'Ubuntu' in banner:
            info['os_hint'] = 'Ubuntu Linux'
        elif 'Debian' in banner:
            info['os_hint'] = 'Debian Linux'
    elif 'HTTP' in banner:
        info['service'] = 'http'
        for line in banner.split('\r\n'):
            if line.lower().startswith('server:'):
                info['version'] = line.split(':', 1)[1].strip()
    elif 'FTP' in banner:
        info['service'] = 'ftp'
        info['version'] = banner

    return info

Vulnerability Checks

With open ports and services identified, the toolkit runs targeted vulnerability checks. These are not exploits—they are non-intrusive configuration audits that check for known weaknesses.

SSL/TLS Assessment

For any service running TLS, the toolkit checks the certificate validity, protocol version, and cipher strength:

import ssl


def check_ssl(host: str, port: int = 443) -> dict:
    """Assess SSL/TLS configuration for common weaknesses."""
    findings = []

    # Check for outdated protocol support
    for protocol_name, protocol_const in [
        ('SSLv3', ssl.PROTOCOL_SSLv23),
        ('TLSv1.0', ssl.PROTOCOL_TLSv1 if hasattr(ssl, 'PROTOCOL_TLSv1') else None),
        ('TLSv1.1', ssl.PROTOCOL_TLSv1_1 if hasattr(ssl, 'PROTOCOL_TLSv1_1') else None),
    ]:
        if protocol_const is None:
            continue
        try:
            ctx = ssl.SSLContext(protocol_const)
            with ctx.wrap_socket(socket.socket(), server_hostname=host) as s:
                s.settimeout(3)
                s.connect((host, port))
                findings.append({
                    'severity': 'HIGH',
                    'finding': f'Server supports deprecated {protocol_name}',
                    'recommendation': f'Disable {protocol_name} support'
                })
        except (ssl.SSLError, OSError):
            pass  # Protocol not supported — good

    # Check certificate expiration
    ctx = ssl.create_default_context()
    try:
        with ctx.wrap_socket(socket.socket(), server_hostname=host) as s:
            s.settimeout(3)
            s.connect((host, port))
            cert = s.getpeercert()
            not_after = ssl.cert_time_to_seconds(cert['notAfter'])
            days_left = (not_after - time.time()) / 86400
            if days_left < 30:
                findings.append({
                    'severity': 'MEDIUM',
                    'finding': f'Certificate expires in {int(days_left)} days',
                    'recommendation': 'Renew SSL certificate before expiration'
                })
    except ssl.SSLCertVerificationError:
        findings.append({
            'severity': 'HIGH',
            'finding': 'SSL certificate verification failed',
            'recommendation': 'Install a valid certificate from a trusted CA'
        })

    return findings

HTTP Security Headers

Web servers often expose themselves to attacks through missing security headers. The toolkit checks for the essential ones:

import http.client

REQUIRED_HEADERS = {
    'Strict-Transport-Security': {
        'severity': 'HIGH',
        'desc': 'Missing HSTS header — vulnerable to protocol downgrade attacks'
    },
    'X-Content-Type-Options': {
        'severity': 'MEDIUM',
        'desc': 'Missing X-Content-Type-Options — vulnerable to MIME sniffing'
    },
    'X-Frame-Options': {
        'severity': 'MEDIUM',
        'desc': 'Missing X-Frame-Options — vulnerable to clickjacking'
    },
    'Content-Security-Policy': {
        'severity': 'MEDIUM',
        'desc': 'Missing CSP header — vulnerable to XSS attacks'
    },
}


def check_http_headers(host: str, port: int = 80) -> list[dict]:
    """Check for missing HTTP security headers."""
    findings = []

    conn = http.client.HTTPConnection(host, port, timeout=5)
    conn.request('HEAD', '/')
    response = conn.getresponse()
    headers = {k.lower(): v for k, v in response.getheaders()}

    for header, info in REQUIRED_HEADERS.items():
        if header.lower() not in headers:
            findings.append({
                'severity': info['severity'],
                'finding': info['desc'],
                'recommendation': f'Add {header} response header'
            })

    # Check for information disclosure
    server = headers.get('server', '')
    if server and any(v in server.lower() for v in ['apache/', 'nginx/', 'iis/']):
        findings.append({
            'severity': 'LOW',
            'finding': f'Server header discloses software version: {server}',
            'recommendation': 'Remove or obfuscate the Server header'
        })

    return findings

SSH Configuration Audit

The toolkit checks SSH servers for weak configurations: password authentication enabled (keys are preferred), root login permitted, and outdated key exchange algorithms:

def check_ssh(host: str, port: int = 22) -> list[dict]:
    """Audit SSH server configuration from the banner and key exchange."""
    findings = []

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(5)
    sock.connect((host, port))
    banner = sock.recv(1024).decode('utf-8', errors='replace')
    sock.close()

    # Check SSH protocol version
    if banner.startswith('SSH-1'):
        findings.append({
            'severity': 'CRITICAL',
            'finding': 'SSH server supports protocol version 1',
            'recommendation': 'Disable SSHv1, use SSHv2 only'
        })

    # Check for known vulnerable OpenSSH versions
    version_match = re.search(r'OpenSSH[_\s](\d+\.\d+)', banner)
    if version_match:
        version = float(version_match.group(1))
        if version < 8.0:
            findings.append({
                'severity': 'HIGH',
                'finding': f'Outdated OpenSSH version: {version}',
                'recommendation': 'Upgrade to OpenSSH 8.0 or later'
            })

    return findings

Report Generation

Raw scan data is useful to an engineer at a terminal, but stakeholders need structured reports. The toolkit generates both a machine-readable JSON output and a human-readable summary. The summary groups findings by severity and provides actionable remediation steps:

def generate_report(scan_results: dict) -> str:
    """Generate a structured assessment report."""
    report_lines = [
        f"Network Security Assessment Report",
        f"Target: {scan_results['host']}",
        f"Scan Date: {scan_results['timestamp']}",
        f"{'=' * 60}",
        "",
        f"SUMMARY",
        f"  Open Ports: {len(scan_results['open_ports'])}",
        f"  Findings:   {len(scan_results['findings'])}",
        f"    Critical: {count_by_severity(scan_results, 'CRITICAL')}",
        f"    High:     {count_by_severity(scan_results, 'HIGH')}",
        f"    Medium:   {count_by_severity(scan_results, 'MEDIUM')}",
        f"    Low:      {count_by_severity(scan_results, 'LOW')}",
        "",
        f"{'=' * 60}",
        "",
    ]

    # Group findings by severity
    for severity in ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW']:
        findings = [f for f in scan_results['findings'] if f['severity'] == severity]
        if findings:
            report_lines.append(f"[{severity}]")
            for f in findings:
                report_lines.append(f"  - {f['finding']}")
                report_lines.append(f"    Remediation: {f['recommendation']}")
                report_lines.append("")

    return '\n'.join(report_lines)

Lessons Learned

Raw sockets teach networking fundamentals. Writing a connect scanner, then understanding why SYN scanning requires raw sockets and root privileges, then learning about the trade-offs between stealth and reliability—this progression builds intuition that no amount of running nmap -sV can replicate.

Threading has limits. At around 500 concurrent threads, the scanner starts producing unreliable results due to local resource exhaustion (file descriptors, network stack buffers). For production-grade scanning, asyncio with aiohttp would be a better choice. But for an assessment toolkit where scan speed is secondary to accuracy, a bounded thread pool works well.

Non-intrusive does not mean non-impactful. Even a connect scan generates traffic. Scanning 65,535 ports from a single source IP in rapid succession will absolutely trigger IDS/IPS alerts and may be logged as an attack. Always have explicit authorization before running any security assessment tool.

The report is the deliverable. No one reads raw JSON. The value of a security assessment is in clearly communicating what was found, how severe it is, and what to do about it. I spent as much time on the report generation module as on the scanning logic, and that was the right allocation.

View on GitHub → All Projects