飞塔特征库离线升级

作者:waMoYu 发布时间: 2026-05-02 阅读量:3 评论数:0

一、手动升级

  • TFTP服务器IP:192.168.10.2

应用控制

execute restore ips tftp apdb_OS6.2.0_35.00187.APDB.pkg 192.168.10.2

互联网服务数据库

💡 Tip

会提示Command fail. Return code 49​实际是更新成功了的,

execute restore other-objects tftp ffdb_fos62_00007.04418.pkg 192.168.10.2

IPS

execute restore ips tftp nids_OS7.4.0_36.00204.NIDS.pkg 192.168.10.4

AV

execute restore av tftp vsigupdate-OS7.4.0_4.04098.AVAI.pkg 192.168.10.4
execute restore av tftp vsigupdate-OS7.4.0_93.06995.ETDB.High.pkg 192.168.10.4
execute restore av tftp vsigupdate-OS7.4.0_93.06995.MMDB.pkg 192.168.10.4

移动端恶意软件特征库

execute restore av tftp vsigupdate-OS6.2.0_93.06840.MMDB.pkg 192.168.10.2

僵尸网络IP数据库

execute restore ips tftp IRISUpdate-OS6.2.0_5.000-fgt.pkg 192.168.10.2

二、脚本升级

每次手动更新太麻烦了,使用DeepSeek编写了一个py脚本进行自动更新,这样每次只需下载特征库升级包即可。

使用方法

  1. 在自定义文件夹中创建py和bat文件,并且将脚本内容粘贴进去。

    py名字最好为:update.py,如果不是请自行修改bat脚本中py脚本名字。

  2. 将下载的离线升级包放在脚本所在文件夹

  3. 双击bat脚本执行升级即可

前置模块安装命令

pip install paramiko -i https://pypi.tuna.tsinghua.edu.cn/simple

Python脚本

⚠️注意修改脚本中飞塔的ip和账号密码!

#!/usr/bin/env python3
"""
自动根据当前目录下的 .pkg 文件生成并执行恢复命令。
- 自动检测本机内网 IP 和 TFTP 服务
- 逐条在远程设备执行恢复命令,自动确认覆盖,等待 "OK" 完成
- 全部恢复完成后,执行 get system status 并提取关键版本信息
- 统计成功/失败数量

用法:
    python deploy_pkg.py                           # 使用内置默认值
    python deploy_pkg.py --host 10.0.0.1 --user admin --password pass
"""

import os
import sys
import glob
import socket
import time
import argparse
import paramiko

# ------------------- 内置默认配置 -------------------
DEFAULT_HOST = "192.168.1.100"     # 远程设备 IP
DEFAULT_USER = "admin"              # SSH 用户名
DEFAULT_PASSWORD = "admin"          # SSH 密码
# ----------------------------------------------------

def get_local_inner_ip():
    """获取本机内网 IPv4 地址(优先私有地址)"""
    hostname = socket.gethostname()
    try:
        ip_list = socket.gethostbyname_ex(hostname)[2]
    except socket.gaierror:
        ip_list = []
    valid_ips = [ip for ip in ip_list if not ip.startswith('127.')]
    if not valid_ips:
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(('10.255.255.255', 1))
            valid_ips = [s.getsockname()[0]]
            s.close()
        except Exception:
            return None
    private_ips = [ip for ip in valid_ips
                   if ip.startswith('10.') or
                      ip.startswith('192.168.') or
                      (ip.startswith('172.') and 16 <= int(ip.split('.')[1]) <= 31)]
    return private_ips[0] if private_ips else valid_ips[0]

def check_tftp_service(local_ip, port=69, timeout=2):
    """检测本机 TFTP 服务是否在运行(UDP 69)"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.settimeout(timeout)
    rrq_packet = b'\x00\x01' + b'test\x00' + b'netascii\x00'
    try:
        sock.sendto(rrq_packet, (local_ip, port))
        data, addr = sock.recvfrom(1024)
        return True
    except socket.timeout:
        return False
    except Exception as e:
        print(f"TFTP 检测异常: {e}")
        return False
    finally:
        sock.close()

def classify_pkg(filename):
    """根据文件名前缀返回类别"""
    base = os.path.basename(filename)
    if base.startswith('ffdb_'):
        return 'other-objects'
    if base.startswith('nids_') or base.startswith('apdb_'):
        return 'ips'
    if base.startswith('vsigupdate-'):
        return 'av'
    return None

def generate_commands(pkg_files, local_ip):
    """生成 (类别, 完整命令) 的列表"""
    commands = []
    for f in pkg_files:
        category = classify_pkg(f)
        if category is None:
            print(f"警告: 跳过无法识别的文件 {f}")
            continue
        cmd = f"execute restore {category} tftp {os.path.basename(f)} {local_ip}"
        commands.append((category, cmd))
    return commands

def parse_status_lines(text):
    """从 get system status 输出中提取六行关键版本信息"""
    targets = [
        "Virus-DB:",
        "Extended DB:",
        "AV AI/ML Model:",
        "IPS-DB:",
        "IPS-ETDB:",
        "APP-DB:"
    ]
    lines = []
    for line in text.splitlines():
        stripped = line.strip()
        for t in targets:
            if stripped.startswith(t):
                lines.append(stripped)
                break
    return lines

def ssh_execute_interactive(host, user, password, commands, cmd_timeout=120):
    """
    交互式 SSH 会话,逐条执行恢复命令。
    返回 (results, status_lines)
      results: 每条命令的执行记录,包含 'ok' 字段
      status_lines: get system status 提取的关键版本行
    """
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    try:
        client.connect(hostname=host, username=user, password=password,
                       look_for_keys=False, allow_agent=False)
        print(f"成功连接到 {host}")
    except Exception as e:
        print(f"SSH 连接失败: {e}")
        sys.exit(1)

    # ----- 第一阶段:交互式执行恢复命令 -----
    channel = client.invoke_shell()
    time.sleep(0.5)
    # 清空初始输出
    while channel.recv_ready():
        channel.recv(4096)

    results = []
    total = len(commands)
    for idx, (category, cmd) in enumerate(commands, 1):
        print(f"\n({idx}/{total}) 执行 [{category}]: {cmd}")
        channel.sendall(cmd + '\n')
        time.sleep(0.3)

        output = b""
        start_time = time.time()
        sent_y = False
        ok_received = False

        while True:
            if channel.recv_ready():
                chunk = channel.recv(4096)
                output += chunk
                if not sent_y and b"Do you want to continue? (y/n)" in output:
                    time.sleep(0.3)
                    channel.sendall('y\n')
                    sent_y = True
                    print("  自动确认: y")
                if sent_y and b"OK" in output:
                    ok_received = True
            else:
                time.sleep(0.5)

            elapsed = time.time() - start_time
            if ok_received and not channel.recv_ready():
                print("  检测到 OK,命令完成")
                break
            if elapsed > cmd_timeout:
                if not ok_received:
                    print(f"  警告: 未在 {cmd_timeout}s 内检测到 OK,继续下一条")
                break

        out_str = output.decode('utf-8', errors='ignore').strip()
        print(out_str)
        results.append({
            'category': category,
            'command': cmd,
            'ok': ok_received
        })

    channel.close()

    # ----- 第二阶段:获取系统版本信息 -----
    status_lines = []
    try:
        print("\n>>> 获取系统版本信息...")
        stdin, stdout, stderr = client.exec_command('get system status')
        out = stdout.read().decode('utf-8', errors='ignore')
        err = stderr.read().decode('utf-8', errors='ignore')
        if err:
            print(f"错误: {err}")
        status_lines = parse_status_lines(out)
        if not status_lines:
            print("警告: 未在输出中找到预期的版本行")
    except Exception as e:
        print(f"获取系统状态失败: {e}")

    client.close()
    return results, status_lines

def main():
    parser = argparse.ArgumentParser(description='批量执行 pkg 恢复命令')
    parser.add_argument('--host', default=DEFAULT_HOST,
                        help=f'远程设备 IP(默认: {DEFAULT_HOST})')
    parser.add_argument('--user', default=DEFAULT_USER,
                        help=f'SSH 用户名(默认: {DEFAULT_USER})')
    parser.add_argument('--password', default=DEFAULT_PASSWORD,
                        help='SSH 密码(默认已内置)')
    parser.add_argument('--timeout', type=int, default=120,
                        help='每条命令等待 OK 的超时秒数(默认 120)')
    args = parser.parse_args()

    pkg_files = glob.glob("*.pkg")
    if not pkg_files:
        print("当前目录未找到 .pkg 文件")
        sys.exit(0)

    print(f"找到 {len(pkg_files)} 个 .pkg 文件:")
    for f in pkg_files:
        print(f"  {f}  -> 类别: {classify_pkg(f) or '未知'}")

    local_ip = get_local_inner_ip()
    if not local_ip:
        print("错误: 无法获取本机内网 IP,请检查网络设置。")
        sys.exit(1)
    print(f"本机内网 IP: {local_ip}")

    print("正在检测本机 TFTP 服务 (UDP 69)...")
    if not check_tftp_service(local_ip):
        print(f"错误: 本机 {local_ip} 的 TFTP 服务未启动或不可达,请先开启 TFTP 服务器。")
        sys.exit(1)
    print("TFTP 服务检测通过。")

    commands = generate_commands(pkg_files, local_ip)
    if not commands:
        print("没有可执行的命令。")
        sys.exit(0)

    print(f"\n将向 {args.host} 发送 {len(commands)} 条命令...")
    results, status_lines = ssh_execute_interactive(
        host=args.host,
        user=args.user,
        password=args.password,
        commands=commands,
        cmd_timeout=args.timeout
    )

    # ----- 统计与结果输出 -----
    success = sum(1 for r in results if r['ok'])
    fail = len(results) - success
    print("\n" + "="*50)
    print(f"执行完毕: 成功 {success} 条, 失败 {fail} 条")
    if status_lines:
        print("\n系统当前安全库版本:")
        for line in status_lines:
            print(f"  {line}")
    print("="*50)

if __name__ == '__main__':
    main()

Bat脚本

@echo off
title 飞塔特征库离线更新脚本

echo 正在启动 TFTP 服务...
start "" "TftpServer.exe"
timeout /t 3 /nobreak >nul

echo ================================
echo 开始执行更新脚本...
echo ================================
python update.py
echo ================================
echo 更新脚本执行完毕。
echo ================================

echo 正在关闭 TFTP 服务...
taskkill /f /im TftpServer.exe >nul 2>&1

echo 完成。
pause

评论