mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-06-18 20:10:13 +02:00
129 lines
3.8 KiB
Go
129 lines
3.8 KiB
Go
package security
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"cyberstrike-ai/internal/config"
|
|
"cyberstrike-ai/internal/mcp"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// setupTestExecutor 创建测试用的执行器
|
|
func setupTestExecutor(t *testing.T) (*Executor, *mcp.Server) {
|
|
logger := zap.NewNop()
|
|
mcpServer := mcp.NewServer(logger)
|
|
|
|
cfg := &config.SecurityConfig{
|
|
Tools: []config.ToolConfig{},
|
|
}
|
|
|
|
executor := NewExecutor(cfg, mcpServer, logger)
|
|
return executor, mcpServer
|
|
}
|
|
|
|
func TestExecutor_ExecuteInternalTool_UnknownTool(t *testing.T) {
|
|
executor, _ := setupTestExecutor(t)
|
|
|
|
ctx := context.Background()
|
|
args := map[string]interface{}{
|
|
"test": "value",
|
|
}
|
|
|
|
// 测试未知的内部工具类型
|
|
toolResult, err := executor.executeInternalTool(ctx, "unknown_tool", "internal:unknown_tool", args)
|
|
if err != nil {
|
|
t.Fatalf("执行内部工具失败: %v", err)
|
|
}
|
|
|
|
if !toolResult.IsError {
|
|
t.Fatal("未知的工具类型应该返回错误")
|
|
}
|
|
|
|
if !strings.Contains(toolResult.Content[0].Text, "未知的内部工具类型") {
|
|
t.Errorf("错误消息应该包含'未知的内部工具类型'")
|
|
}
|
|
}
|
|
|
|
func TestExecuteSystemCommand_BackgroundDoesNotBlockOnChildStdout(t *testing.T) {
|
|
executor, _ := setupTestExecutor(t)
|
|
// 子进程先向 stdout 写无换行字符再长时间 sleep;若与 echo $pid 共享管道且未重定向子进程 stdout,
|
|
// ReadString('\n') 会阻塞到子进程退出。后台包装须将子进程标准流与 PID 行分离。
|
|
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
|
|
defer cancel()
|
|
args := map[string]interface{}{
|
|
"command": `(sh -c 'printf x; sleep 120') &`,
|
|
"shell": "sh",
|
|
}
|
|
res, err := executor.executeSystemCommand(ctx, args)
|
|
if err != nil {
|
|
t.Fatalf("executeSystemCommand: %v", err)
|
|
}
|
|
if res == nil || res.IsError {
|
|
t.Fatalf("expected success, got %+v", res)
|
|
}
|
|
txt := res.Content[0].Text
|
|
if !strings.Contains(txt, "后台命令已启动") {
|
|
t.Fatalf("unexpected body: %q", txt)
|
|
}
|
|
}
|
|
|
|
func TestBuildCommandArgs_NmapSkipsEmptyOptionalFlags(t *testing.T) {
|
|
pos1 := 1
|
|
executor, _ := setupTestExecutor(t)
|
|
toolConfig := &config.ToolConfig{
|
|
Name: "nmap",
|
|
Command: "nmap",
|
|
Args: []string{"-sT", "-sV", "-sC"},
|
|
Parameters: []config.ParameterConfig{
|
|
{Name: "target", Type: "string", Required: true, Position: &pos1, Format: "positional"},
|
|
{Name: "ports", Type: "string", Flag: "-p", Format: "flag"},
|
|
{Name: "timing", Type: "string", Template: "-T{value}", Format: "template"},
|
|
{Name: "nse_scripts", Type: "string", Flag: "--script", Format: "flag"},
|
|
{Name: "os_detection", Type: "bool", Flag: "-O", Format: "flag", Default: false},
|
|
{Name: "aggressive", Type: "bool", Flag: "-A", Format: "flag", Default: false},
|
|
{Name: "scan_type", Type: "string", Format: "template", Template: "{value}"},
|
|
{Name: "additional_args", Type: "string", Format: "positional"},
|
|
},
|
|
}
|
|
|
|
args := map[string]interface{}{
|
|
"target": "110.52.223.114",
|
|
"ports": "21, 22, 80, 443",
|
|
"timing": "4",
|
|
"nse_scripts": "",
|
|
"scan_type": "",
|
|
"os_detection": false,
|
|
"aggressive": false,
|
|
"additional_args": "-Pn",
|
|
}
|
|
|
|
cmdArgs := executor.buildCommandArgs("nmap", toolConfig, args)
|
|
joined := strings.Join(cmdArgs, " ")
|
|
|
|
if strings.Contains(joined, "--script") {
|
|
t.Fatalf("empty nse_scripts must not emit --script, got: %v", cmdArgs)
|
|
}
|
|
if !strings.Contains(joined, "110.52.223.114") {
|
|
t.Fatalf("target missing from args: %v", cmdArgs)
|
|
}
|
|
// target 应出现在 -Pn 之前,避免被误当作 --script 的参数
|
|
pnIdx := indexOf(cmdArgs, "-Pn")
|
|
targetIdx := indexOf(cmdArgs, "110.52.223.114")
|
|
if pnIdx < 0 || targetIdx < 0 || targetIdx >= pnIdx {
|
|
t.Fatalf("expected target before -Pn, got: %v", cmdArgs)
|
|
}
|
|
}
|
|
|
|
func indexOf(slice []string, s string) int {
|
|
for i, v := range slice {
|
|
if v == s {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|