一、手动升级
TFTP服务器IP:192.168.10.2
应用控制
execute restore ips tftp apdb_OS6.2.0_35.00187.APDB.pkg 192.168.10.2互联网服务数据库
💡 Tip
会提示Command fail. Return code 49实际是更新成功了的,
execute restore other-objects tftp ffdb_fos62_00007.04418.pkg 192.168.10.2IPS
execute restore ips tftp nids_OS7.4.0_36.00204.NIDS.pkg 192.168.10.4AV
execute restore av tftp vsigupdate-OS7.4.0_4.04098.AVAI.pkg 192.168.10.4
execute restore av tftp vsigupdate-OS7.4.0_93.06995.ETDB.High.pkg 192.168.10.4
execute restore av tftp vsigupdate-OS7.4.0_93.06995.MMDB.pkg 192.168.10.4移动端恶意软件特征库
execute restore av tftp vsigupdate-OS6.2.0_93.06840.MMDB.pkg 192.168.10.2僵尸网络IP数据库
execute restore ips tftp IRISUpdate-OS6.2.0_5.000-fgt.pkg 192.168.10.2
二、脚本升级
每次手动更新太麻烦了,使用DeepSeek编写了一个py脚本进行自动更新,这样每次只需下载特征库升级包即可。
使用方法
在自定义文件夹中创建py和bat文件,并且将脚本内容粘贴进去。
py名字最好为:update.py,如果不是请自行修改bat脚本中py脚本名字。
将下载的离线升级包放在脚本所在文件夹
双击bat脚本执行升级即可


前置模块安装命令
pip install paramiko -i https://pypi.tuna.tsinghua.edu.cn/simplePython脚本
⚠️注意修改脚本中飞塔的ip和账号密码!
#!/usr/bin/env python3
"""
自动根据当前目录下的 .pkg 文件生成并执行恢复命令。
- 自动检测本机内网 IP 和 TFTP 服务
- 逐条在远程设备执行恢复命令,自动确认覆盖,等待 "OK" 完成
- 全部恢复完成后,执行 get system status 并提取关键版本信息
- 统计成功/失败数量
用法:
python deploy_pkg.py # 使用内置默认值
python deploy_pkg.py --host 10.0.0.1 --user admin --password pass
"""
import os
import sys
import glob
import socket
import time
import argparse
import paramiko
# ------------------- 内置默认配置 -------------------
DEFAULT_HOST = "192.168.1.100" # 远程设备 IP
DEFAULT_USER = "admin" # SSH 用户名
DEFAULT_PASSWORD = "admin" # SSH 密码
# ----------------------------------------------------
def get_local_inner_ip():
"""获取本机内网 IPv4 地址(优先私有地址)"""
hostname = socket.gethostname()
try:
ip_list = socket.gethostbyname_ex(hostname)[2]
except socket.gaierror:
ip_list = []
valid_ips = [ip for ip in ip_list if not ip.startswith('127.')]
if not valid_ips:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('10.255.255.255', 1))
valid_ips = [s.getsockname()[0]]
s.close()
except Exception:
return None
private_ips = [ip for ip in valid_ips
if ip.startswith('10.') or
ip.startswith('192.168.') or
(ip.startswith('172.') and 16 <= int(ip.split('.')[1]) <= 31)]
return private_ips[0] if private_ips else valid_ips[0]
def check_tftp_service(local_ip, port=69, timeout=2):
"""检测本机 TFTP 服务是否在运行(UDP 69)"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
rrq_packet = b'\x00\x01' + b'test\x00' + b'netascii\x00'
try:
sock.sendto(rrq_packet, (local_ip, port))
data, addr = sock.recvfrom(1024)
return True
except socket.timeout:
return False
except Exception as e:
print(f"TFTP 检测异常: {e}")
return False
finally:
sock.close()
def classify_pkg(filename):
"""根据文件名前缀返回类别"""
base = os.path.basename(filename)
if base.startswith('ffdb_'):
return 'other-objects'
if base.startswith('nids_') or base.startswith('apdb_'):
return 'ips'
if base.startswith('vsigupdate-'):
return 'av'
return None
def generate_commands(pkg_files, local_ip):
"""生成 (类别, 完整命令) 的列表"""
commands = []
for f in pkg_files:
category = classify_pkg(f)
if category is None:
print(f"警告: 跳过无法识别的文件 {f}")
continue
cmd = f"execute restore {category} tftp {os.path.basename(f)} {local_ip}"
commands.append((category, cmd))
return commands
def parse_status_lines(text):
"""从 get system status 输出中提取六行关键版本信息"""
targets = [
"Virus-DB:",
"Extended DB:",
"AV AI/ML Model:",
"IPS-DB:",
"IPS-ETDB:",
"APP-DB:"
]
lines = []
for line in text.splitlines():
stripped = line.strip()
for t in targets:
if stripped.startswith(t):
lines.append(stripped)
break
return lines
def ssh_execute_interactive(host, user, password, commands, cmd_timeout=120):
"""
交互式 SSH 会话,逐条执行恢复命令。
返回 (results, status_lines)
results: 每条命令的执行记录,包含 'ok' 字段
status_lines: get system status 提取的关键版本行
"""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=host, username=user, password=password,
look_for_keys=False, allow_agent=False)
print(f"成功连接到 {host}")
except Exception as e:
print(f"SSH 连接失败: {e}")
sys.exit(1)
# ----- 第一阶段:交互式执行恢复命令 -----
channel = client.invoke_shell()
time.sleep(0.5)
# 清空初始输出
while channel.recv_ready():
channel.recv(4096)
results = []
total = len(commands)
for idx, (category, cmd) in enumerate(commands, 1):
print(f"\n({idx}/{total}) 执行 [{category}]: {cmd}")
channel.sendall(cmd + '\n')
time.sleep(0.3)
output = b""
start_time = time.time()
sent_y = False
ok_received = False
while True:
if channel.recv_ready():
chunk = channel.recv(4096)
output += chunk
if not sent_y and b"Do you want to continue? (y/n)" in output:
time.sleep(0.3)
channel.sendall('y\n')
sent_y = True
print(" 自动确认: y")
if sent_y and b"OK" in output:
ok_received = True
else:
time.sleep(0.5)
elapsed = time.time() - start_time
if ok_received and not channel.recv_ready():
print(" 检测到 OK,命令完成")
break
if elapsed > cmd_timeout:
if not ok_received:
print(f" 警告: 未在 {cmd_timeout}s 内检测到 OK,继续下一条")
break
out_str = output.decode('utf-8', errors='ignore').strip()
print(out_str)
results.append({
'category': category,
'command': cmd,
'ok': ok_received
})
channel.close()
# ----- 第二阶段:获取系统版本信息 -----
status_lines = []
try:
print("\n>>> 获取系统版本信息...")
stdin, stdout, stderr = client.exec_command('get system status')
out = stdout.read().decode('utf-8', errors='ignore')
err = stderr.read().decode('utf-8', errors='ignore')
if err:
print(f"错误: {err}")
status_lines = parse_status_lines(out)
if not status_lines:
print("警告: 未在输出中找到预期的版本行")
except Exception as e:
print(f"获取系统状态失败: {e}")
client.close()
return results, status_lines
def main():
parser = argparse.ArgumentParser(description='批量执行 pkg 恢复命令')
parser.add_argument('--host', default=DEFAULT_HOST,
help=f'远程设备 IP(默认: {DEFAULT_HOST})')
parser.add_argument('--user', default=DEFAULT_USER,
help=f'SSH 用户名(默认: {DEFAULT_USER})')
parser.add_argument('--password', default=DEFAULT_PASSWORD,
help='SSH 密码(默认已内置)')
parser.add_argument('--timeout', type=int, default=120,
help='每条命令等待 OK 的超时秒数(默认 120)')
args = parser.parse_args()
pkg_files = glob.glob("*.pkg")
if not pkg_files:
print("当前目录未找到 .pkg 文件")
sys.exit(0)
print(f"找到 {len(pkg_files)} 个 .pkg 文件:")
for f in pkg_files:
print(f" {f} -> 类别: {classify_pkg(f) or '未知'}")
local_ip = get_local_inner_ip()
if not local_ip:
print("错误: 无法获取本机内网 IP,请检查网络设置。")
sys.exit(1)
print(f"本机内网 IP: {local_ip}")
print("正在检测本机 TFTP 服务 (UDP 69)...")
if not check_tftp_service(local_ip):
print(f"错误: 本机 {local_ip} 的 TFTP 服务未启动或不可达,请先开启 TFTP 服务器。")
sys.exit(1)
print("TFTP 服务检测通过。")
commands = generate_commands(pkg_files, local_ip)
if not commands:
print("没有可执行的命令。")
sys.exit(0)
print(f"\n将向 {args.host} 发送 {len(commands)} 条命令...")
results, status_lines = ssh_execute_interactive(
host=args.host,
user=args.user,
password=args.password,
commands=commands,
cmd_timeout=args.timeout
)
# ----- 统计与结果输出 -----
success = sum(1 for r in results if r['ok'])
fail = len(results) - success
print("\n" + "="*50)
print(f"执行完毕: 成功 {success} 条, 失败 {fail} 条")
if status_lines:
print("\n系统当前安全库版本:")
for line in status_lines:
print(f" {line}")
print("="*50)
if __name__ == '__main__':
main()Bat脚本
@echo off
title 飞塔特征库离线更新脚本
echo 正在启动 TFTP 服务...
start "" "TftpServer.exe"
timeout /t 3 /nobreak >nul
echo ================================
echo 开始执行更新脚本...
echo ================================
python update.py
echo ================================
echo 更新脚本执行完毕。
echo ================================
echo 正在关闭 TFTP 服务...
taskkill /f /im TftpServer.exe >nul 2>&1
echo 完成。
pause
