diff --git a/.github/workflows/python-package-windows.yml b/.github/workflows/python-package-windows.yml index 061dd2476ae..66a1668b3df 100644 --- a/.github/workflows/python-package-windows.yml +++ b/.github/workflows/python-package-windows.yml @@ -30,7 +30,7 @@ jobs: architecture: 'x86' - name: Install dependencies - run: pip install --upgrade pytest requests + run: pip install --upgrade pytest requests Flask - name: Run analyzer unit tests run: | diff --git a/agent/agent_flask.py b/agent/agent_flask.py new file mode 100644 index 00000000000..15015d87d17 --- /dev/null +++ b/agent/agent_flask.py @@ -0,0 +1,603 @@ +# Copyright (C) 2010-2019 Cuckoo Foundation. +# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org +# See the file 'docs/LICENSE' for copying permission. +import argparse +import base64 +import enum +import multiprocessing +import os +import platform +import shlex +import shutil +import socket +import stat +import subprocess +import sys +import tempfile +import traceback +from io import StringIO +from zipfile import ZipFile + +try: + from flask import Flask, request, jsonify, send_file +except ImportError: + sys.exit("Missed dependency: pip3 install flask") + +if sys.version_info[:2] < (3, 8): + sys.exit("You are running an incompatible version of Python, please use >= 3.8") +""" +# You must run x86 version not x64 +# The analysis process interacts with low-level Windows libraries that need a +# x86 Python to be running. +# (see https://github.com/kevoreilly/CAPEv2/issues/1680) +if sys.maxsize > 2**32 and sys.platform == "win32": + sys.exit("You should install python3 x86! not x64") +""" +app = Flask("CAPE Agent") +AGENT_VERSION = "0.18" +AGENT_FEATURES = [ + "execpy", + "execute", + "pinning", + "logs", + "largefile", + "unicodepath", +] +BASE_64_ENCODING = "base64" + +if sys.platform == "win32": + AGENT_FEATURES.append("mutex") + MUTEX_TIMEOUT_MS = 500 + from ctypes import WinError, windll + + kernel32 = windll.kernel32 + SYNCHRONIZE = 0x100000 + ERROR_FILE_NOT_FOUND = 0x2 + WAIT_ABANDONED = 0x00000080 + WAIT_OBJECT_0 = 0x0 + WAIT_TIMEOUT = 0x102 + WAIT_FAILED = 0xFFFFFFFF + + +class Status(enum.IntEnum): + INIT = 1 + RUNNING = 2 + COMPLETE = 3 + FAILED = 4 + EXCEPTION = 5 + + def __str__(self): + return f"{self.name.lower()}" + + @classmethod + def _missing_(cls, value): + if not isinstance(value, str): + return None + value = value.lower() + for member in cls: + if str(member) == value: + return member + if value.isnumeric() and int(value) == member.value: + return member + return None + + +ANALYZER_FOLDER = "" +agent_mutexes = {} +"""Holds handles of mutexes held by the agent.""" +state = { + "status": Status.INIT, + "description": "", + "async_subprocess": None, + "mutexes": agent_mutexes, +} + + +def verify_request(request): + if "client_ip" in state and request.remote_addr != state["client_ip"]: + if request.remote_addr != "127.0.0.1": + return + if request.path != "/status" or request.method != "POST": + return + +def shutdown_server(): + func = request.environ.get('werkzeug.server.shutdown') + if func is None: + raise RuntimeError('Not running with the Werkzeug Server') + func() + +''' +class send_file: + """Wrapper that represents Flask.send_file functionality.""" + + def __init__(self, path, encoding): + self.length = None + self.path = path + self.status_code = 200 + self.encoding = encoding + + def okay_to_send(self): + return os.path.isfile(self.path) and os.access(self.path, os.R_OK) + + def init(self): + if self.okay_to_send(): + if self.encoding != BASE_64_ENCODING: + self.length = os.path.getsize(self.path) + else: + self.status_code = 404 + + def write(self, httplog, sock): + if not self.okay_to_send(): + return + + try: + with open(self.path, "rb") as f: + buf = f.read(1024 * 1024) + while buf: + if self.encoding == BASE_64_ENCODING: + buf = base64.b64encode(buf) + sock.write(buf) + buf = f.read(1024 * 1024) + except Exception as ex: + httplog.log_error(f"Error reading file {self.path}: {ex}") + + def headers(self, obj): + obj.send_header("Content-Length", self.length) +''' + +def isAdmin(): + is_admin = None + try: + if sys.platform == "win32": + import ctypes + + is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 + else: + is_admin = os.getuid() == 0 + except Exception as e: + print(e) + + return is_admin + + +def json_error(error_code: int, message: str, **kwargs) -> jsonify: + r = jsonify(message=message, error_code=error_code, **kwargs) + r.status_code = error_code + return r + + +def json_exception(message: str) -> jsonify: + r = jsonify(message=message, error_code=500, traceback=traceback.format_exc()) + r.status_code = 500 + return r + + +def json_success(message: str, status_code=200, **kwargs) -> jsonify: + return jsonify(message=message, status_code=status_code, **kwargs) + + +@app.route("/") +def get_index(): + is_admin = isAdmin() + return json_success("CAPE Agent!", version=AGENT_VERSION, features=AGENT_FEATURES, is_user_admin=bool(is_admin)) + + +def get_subprocess_status(): + """Return the subprocess status.""" + async_subprocess = state.get("async_subprocess") + message = "Analysis status" + exitcode = async_subprocess.exitcode + if exitcode is None or (sys.platform == "win32" and exitcode == 259): + # Process is still running. + state["status"] = Status.RUNNING + return json_success( + message=message, + status=str(state.get("status")), + description=state.get("description"), + process_id=async_subprocess.pid, + ) + # Process completed; reset async subprocess state. + state["async_subprocess"] = None + if exitcode == 0: + state["status"] = Status.COMPLETE + state["description"] = "" + else: + state["status"] = Status.FAILED + state["description"] = f"Exited with exit code {exitcode}" + return json_success( + message=message, + status=str(state.get("status")), + description=state.get("description"), + exitcode=exitcode, + ) + + +def open_mutex(mutex_name): + assert sys.platform == "win32" + access = SYNCHRONIZE # only flag the mutex for use + inherit_handle = False # don't pass the handle to children + hndl_mutex = kernel32.OpenMutexW(access, inherit_handle, mutex_name) + if not hndl_mutex: + winerr = WinError() + if winerr.errno == ERROR_FILE_NOT_FOUND: + return None, json_error(404, "mutex not found") + return None, json_error(500, f"error accessing mutex: {winerr}") + return hndl_mutex, None + + +def wait_mutex(hndl_mutex): + assert sys.platform == "win32" + ret = kernel32.WaitForSingleObject(hndl_mutex, MUTEX_TIMEOUT_MS) + if ret in (WAIT_ABANDONED, WAIT_OBJECT_0): + return True, None + elif ret == WAIT_TIMEOUT: + return False, json_error(408, "timeout waiting for mutex") + elif ret == WAIT_FAILED: + # get the extended error information + winerr = WinError() + return False, json_error(500, f"failed waiting for mutex: {winerr}") + else: + return False, json_error(500, f"failed waiting for mutex: {ret}") + + +def release_mutex(hndl_mutex): + assert sys.platform == "win32" + ret = kernel32.ReleaseMutex(hndl_mutex) + if not ret: + # get the extended error information + winerr = WinError() + return False, json_error(500, f"failed releasing mutex: {winerr}") + return True, None + + +@app.route("/status") +def get_status(): + if state["status"] != Status.COMPLETE and state.get("async_subprocess") is not None: + return get_subprocess_status() + return json_success("Analysis status", status=str(state.get("status")), description=state.get("description")) + + +@app.route("/mutex", methods=["POST"]) +def post_mutex(): + if sys.platform != "win32": + return json_error(400, f"mutex feature not supported on {sys.platform}") + mutex_name = request.form.get("mutex", "") + if not mutex_name: + return json_error(400, "no mutex provided") + if mutex_name in agent_mutexes: + return json_success(f"have mutex: {mutex_name}") + + # does the mutex exist? + hndl_mutex, error = open_mutex(mutex_name) + if error: + return error + + # try waiting on it + ok, error = wait_mutex(hndl_mutex) + if ok: + # save the mutex handle for future requests + agent_mutexes[mutex_name] = hndl_mutex + return json_success(f"got mutex: {mutex_name}", status_code=201) + return error + + +@app.route("/mutex", methods=["DELETE"]) +def delete_mutex(): + if sys.platform != "win32": + return json_error(400, f"mutex feature not supported on {sys.platform}") + mutex_name = request.form.get("mutex", "") + if not mutex_name: + return json_error(400, "no mutex provided") + if mutex_name not in agent_mutexes: + return json_error(404, f"mutex does not exist: {mutex_name}") + hndl_mutex = agent_mutexes.pop(mutex_name) + ok, error = release_mutex(hndl_mutex) + if ok: + return json_success(f"released mutex: {mutex_name}") + return error + + +@app.route("/status", methods=["POST"]) +def put_status(): + try: + status = Status(request.form.get("status")) + except ValueError: + return json_error(400, "No valid status has been provided") + + state["status"] = status + state["description"] = request.form.get("description") + return json_success("Analysis status updated") + + +@app.route("/logs") +def get_logs(): + if isinstance(sys.stdout, StringIO): + stdoutbuf = sys.stdout.getvalue() + stderrbuf = sys.stderr.getvalue() + else: + stdoutbuf = "verbose mode, stdout not saved" + stderrbuf = "verbose mode, stderr not saved" + return json_success("Agent logs", stdout=stdoutbuf, stderr=stderrbuf) + + +@app.route("/system") +def get_system(): + return json_success("System", system=platform.system()) + + +@app.route("/environ") +def get_environ(): + return json_success("Environment variables", environ=dict(os.environ)) + + +@app.route("/path") +def get_path(): + return json_success("Agent path", filepath=os.path.abspath(__file__)) + + +@app.route("/mkdir", methods=["POST"]) +def do_mkdir(): + if "dirpath" not in request.form: + return json_error(400, "No dirpath has been provided") + + try: + mode = int(request.form.get("mode", 0o777)) + + os.makedirs(request.form["dirpath"], mode=mode, exist_ok=True) + except Exception as ex: + print(f"error creating dir {ex}") + return json_exception("Error creating directory") + + return json_success("Successfully created directory") + + +@app.route("/mktemp", methods=("GET", "POST")) +def do_mktemp(): + suffix = request.form.get("suffix", "") + prefix = request.form.get("prefix", "tmp") + dirpath = request.form.get("dirpath") + + try: + fd, filepath = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dirpath) + except Exception: + return json_exception("Error creating temporary file") + + os.close(fd) + + return json_success("Successfully created temporary file", filepath=filepath) + + +@app.route("/mkdtemp", methods=("GET", "POST")) +def do_mkdtemp(): + suffix = request.form.get("suffix", "") + prefix = request.form.get("prefix", "tmp") + dirpath = request.form.get("dirpath") + + try: + dirpath = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dirpath) + except Exception: + return json_exception("Error creating temporary directory") + + return json_success("Successfully created temporary directory", dirpath=dirpath) + + +@app.route("/store", methods=["POST"]) +def do_store(): + if "filepath" not in request.form: + return json_error(400, "No filepath has been provided") + + if "file" not in request.files: + return json_error(400, "No file has been provided") + + try: + with open(request.form["filepath"], "wb") as f: + shutil.copyfileobj(request.files["file"], f, 10 * 1024 * 1024) + except Exception as ex: + return json_exception(f"Error storing file: {ex}") + + return json_success("Successfully stored file") + + +@app.route("/retrieve", methods=["POST"]) +def do_retrieve(): + if "filepath" not in request.form: + return json_error(400, "No filepath has been provided") + if not os.path.exists(request.form["filepath"]): + return json_error(404, "Filepath doesn't exist") + return send_file(request.form["filepath"], request.form.get("encoding", "")) + + +@app.route("/extract", methods=["POST"]) +def do_extract(): + if "dirpath" not in request.form: + return json_error(400, "No dirpath has been provided") + + # import code;code.interact(local=dict(locals(), **globals())) + if "zipfile" not in request.files: + return json_error(400, "No zip file has been provided") + + try: + with ZipFile(request.files["zipfile"], "r") as archive: + archive.extractall(request.form["dirpath"]) + except Exception as ex: + return json_exception(f"Error extracting zip file {ex}") + + return json_success("Successfully extracted zip file") + + +@app.route("/remove", methods=["POST"]) +def do_remove(): + if "path" not in request.form: + return json_error(400, "No path has been provided") + + try: + if os.path.isdir(request.form["path"]): + # Mark all files as readable so they can be deleted. + for dirpath, _, filenames in os.walk(request.form["path"]): + for filename in filenames: + os.chmod(os.path.join(dirpath, filename), stat.S_IWRITE) + + shutil.rmtree(request.form["path"]) + message = "Successfully deleted directory" + elif os.path.isfile(request.form["path"]): + os.chmod(request.form["path"], stat.S_IWRITE) + os.remove(request.form["path"]) + message = "Successfully deleted file" + else: + return json_error(404, "Path provided does not exist") + except Exception: + return json_exception("Error removing file or directory") + + return json_success(message) + + +@app.route("/execute", methods=["POST"]) +def do_execute(): + local_ip = socket.gethostbyname(socket.gethostname()) + + if "command" not in request.form: + return json_error(400, "No command has been provided") + command_to_execute = shlex.split(request.form["command"]) + + # only allow date command from localhost. Even this is just to + # let it be tested + allowed_commands = ["date", "cmd /c date /t"] + if request.remote_addr in ("127.0.0.1", local_ip) and request.form["command"] not in allowed_commands: + return json_error(500, "Not allowed to execute commands") + + # Execute the command asynchronously? As a shell command? + async_exec = "async" in request.form + shell = "shell" in request.form + + cwd = request.form.get("cwd") + stdout = stderr = None + + try: + if async_exec: + subprocess.Popen(command_to_execute, shell=shell, cwd=cwd) + else: + p = subprocess.Popen(command_to_execute, shell=shell, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = p.communicate() + if request.form.get("encoding", "") == BASE_64_ENCODING: + stdout = base64.b64encode(stdout) + stderr = base64.b64encode(stderr) + except Exception as ex: + state["status"] = Status.FAILED + state["description"] = "Error execute command" + return json_exception(f"Error executing command: {ex}") + + state["status"] = Status.RUNNING + state["description"] = "" + return json_success("Successfully executed command", stdout=stdout, stderr=stderr) + + +def run_subprocess(command_args, cwd, base64_encode, shell=False): + """Execute the subprocess, wait for completion. + + Return the exitcode (returncode), the stdout, and the stderr. + """ + p = subprocess.Popen( + args=command_args, + cwd=cwd, + shell=shell, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = p.communicate() + if base64_encode: + stdout = base64.b64encode(stdout) + stderr = base64.b64encode(stderr) + return p.returncode, stdout, stderr + + +def background_subprocess(command_args, cwd, base64_encode, shell=False): + """Run subprocess, wait for completion, then exit. + + This process must exit, so the parent process (agent) can find the exit status.""" + # TODO: return the stdout/stderr to the parent process. + returncode, stdout, stderr = run_subprocess(command_args, cwd, base64_encode, shell) + sys.stdout.write(stdout.decode("ascii")) + sys.stderr.write(stderr.decode("ascii")) + sys.exit(returncode) + + +def spawn(args, cwd, base64_encode, shell=False): + """Kick off a subprocess in the background.""" + run_subprocess_args = [args, cwd, base64_encode, shell] + proc = multiprocessing.Process(target=background_subprocess, name=f"child process {args[1]}", args=run_subprocess_args) + proc.start() + state["status"] = Status.RUNNING + state["description"] = "" + state["async_subprocess"] = proc + return json_success("Successfully spawned command", process_id=proc.pid) + + +@app.route("/execpy", methods=["POST"]) +def do_execpy(): + if "filepath" not in request.form: + return json_error(400, "No Python file has been provided") + + # Execute the command asynchronously? As a shell command? + async_exec = "async" in request.form + base64_encode = request.form.get("encoding", "") == BASE_64_ENCODING + + cwd = request.form.get("cwd") + + args = ( + sys.executable, + request.form["filepath"], + ) + + if async_exec and state["status"] == Status.RUNNING and state["async_subprocess"]: + return json_error(400, "Async process already running.") + try: + if async_exec: + return spawn(args, cwd, base64_encode) + exitcode, stdout, stderr = run_subprocess(args, cwd, base64_encode) + if exitcode == 0: + state["status"] = Status.COMPLETE + state["description"] = "" + return json_success("Successfully executed command", stdout=stdout, stderr=stderr) + # Process exited with non-zero result. + state["status"] = Status.FAILED + message = "Error executing python command." + state["description"] = message + return json_error(400, message, stdout=stdout, stderr=stderr, exitcode=exitcode) + except Exception as ex: + state["status"] = Status.FAILED + state["description"] = "Error executing Python command" + return json_exception(f"Error executing Python command: {ex}") + + +@app.route("/pinning") +def do_pinning(): + if "client_ip" in state: + return json_error(500, "Agent has already been pinned to an IP!") + + state["client_ip"] = request.remote_addr + return json_success("Successfully pinned Agent", client_ip=request.remote_addr) + + +@app.route("/kill") +def do_kill(): + shutdown = request.environ.get("werkzeug.server.shutdown") + if shutdown is None: + return json_error(500, "Not running with the Werkzeug server") + + shutdown() + return json_success("Quit the CAPE Agent") + + +if __name__ == "__main__": + multiprocessing.set_start_method("spawn") + parser = argparse.ArgumentParser() + parser.add_argument("host", nargs="?", default="0.0.0.0") + parser.add_argument("port", type=int, nargs="?", default=8000) + parser.add_argument("-v", "--verbose", action="store_true") + args = parser.parse_args() + + if not args.verbose: + sys.stdout = StringIO() + sys.stderr = StringIO() + + app.run(host=args.host, port=args.port) diff --git a/agent/readme.md b/agent/readme.md new file mode 100644 index 00000000000..b6bdf500fb3 --- /dev/null +++ b/agent/readme.md @@ -0,0 +1,4 @@ +### Agent vs Agent flask +* Agent: Works on python < 3.13 Original agent uses native python library, but most of the funcionality were ripped from flask itself. The deprecation of CGI and no easy proper replacement of it. Forced to think how to handle it better. +* Agent_flask: Works on all version of python. The same agent with modernized logic and simplified code. It has dependency so to use it inside windows guest you need to install: `pip3 install flask`. Tested version: `Flask==3.0.3`. + diff --git a/agent/test_agent_flask.py b/agent/test_agent_flask.py new file mode 100644 index 00000000000..fa7d1849cd2 --- /dev/null +++ b/agent/test_agent_flask.py @@ -0,0 +1,750 @@ +"""Tests for the agent.""" + +# import base64 +import datetime +import io +# import json +import multiprocessing +import os +import pathlib +import random +import shutil +import sys +import tempfile +import time +import unittest +import uuid +import zipfile +from unittest import mock +from urllib.parse import urljoin + +import pytest +import requests + +import agent_flask as agent +from agent_flask import app + + +HOST = "127.0.0.1" +PORT = 8000 +BASE_URL = f"http://{HOST}:{PORT}" + +DIRPATH = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) + + +def make_temp_name(): + return str(uuid.uuid4()) + +class TestAgentFunctions: + @mock.patch("sys.platform", "win32") + def test_get_subprocess_259(self): + with app.app_context(): + mock_process_id = 999998 + mock_subprocess = mock.Mock(spec=multiprocessing.Process) + mock_subprocess.exitcode = 259 + mock_subprocess.pid = mock_process_id + with mock.patch.dict(agent.state, {"async_subprocess": mock_subprocess}): + actual = agent.get_subprocess_status() + assert actual.status_code == 200 + actual_json = actual.json + assert actual_json["status"] == "running" + assert actual_json["process_id"] == mock_process_id + + +@mock.patch("sys.platform", "linux") +class TestMutexAPILinux(unittest.TestCase): + def test_post_mutex_linux(self): + """Mutex POSTs are only supported on win32""" + response = app.test_client().post(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + assert response.json["message"] == "mutex feature not supported on linux" + assert response.status_code == 400 + + def test_delete_mutex_linux(self): + """Mutex DELETEs are only supported on win32""" + response = app.test_client().delete(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + #assert isinstance(response, agent.jsonify) + assert response.json["message"] == "mutex feature not supported on linux" + assert response.status_code == 400 + + +@mock.patch("sys.platform", "win32") +class TestMutexAPIWin32(unittest.TestCase): + def test_post_mutex_win32_201(self): + """Mutex POSTs succeed with mocked mutex APIs""" + # mutex = self.id() + # agent.request.form["mutex"] = mutex + response = app.test_client().post(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + # fake handle mutex based on test id + hndl_mutex = self.id() + + # mock opening a mutex returning the fake handle + open_mutex_mock = mock.MagicMock() + open_mutex_mock.return_value = hndl_mutex, None + agent.open_mutex = open_mutex_mock + + # mock mutex is acquired + wait_mutex_mock = mock.MagicMock() + wait_mutex_mock.return_value = True, None + agent.wait_mutex = wait_mutex_mock + + response = app.test_client().post(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + wait_mutex_mock.assert_called_once_with(hndl_mutex) + # assert isinstance(response, agent.jsonify) + assert response.status_code == 201 + + def test_post_mutex_win32_error_mutex_doesnt_exist(self): + """Mutex POSTs fail gracefully when mutexes won't open""" + # mutex = self.id() + response = app.test_client().post(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + + # mock opening a mutex returning an error + open_mutex_mock = mock.MagicMock() + mock_error = mock.MagicMock() + open_mutex_mock.return_value = None, mock_error + agent.open_mutex = open_mutex_mock + + # response = agent.post_mutex() + response = app.test_client().post(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + assert response is mock_error + + def test_post_mutex_win32_error_mutex_wait_failed(self): + """Mutex POSTs fail gracefully when mutex waiting fails""" + mutex = self.id() + # agent.request.form["mutex"] = mutex + response = app.test_client().post(f"{BASE_URL}/mutex", data={"mutex": mutex}) + + # fake handle mutex based on test id + hndl_mutex = self.id() + + # mock opening a mutex returning the fake handle + open_mutex_mock = mock.MagicMock() + mock_error = mock.MagicMock() + open_mutex_mock.return_value = hndl_mutex, None + agent.open_mutex = open_mutex_mock + + # mock mutex fails to be acquired + wait_mutex_mock = mock.MagicMock() + mock_error = mock.MagicMock() + wait_mutex_mock.return_value = None, mock_error + agent.wait_mutex = wait_mutex_mock + + # response = agent.post_mutex() + response = app.test_client().post(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + open_mutex_mock.assert_called_once_with(mutex) + wait_mutex_mock.assert_called_once_with(hndl_mutex) + assert response is mock_error + + def test_delete_mutex_win32_404(self): + """Mutex DELETEs 404 when not held""" + mutex = self.id() + # agent.request.form["mutex"] = mutex + response = app.test_client().post(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + self.assertNotIn(mutex, agent.agent_mutexes) + # response = agent.delete_mutex() + response = app.test_client().delete(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + # assert isinstance(response, agent.jsonify) + assert response.status_code == 404 + + def test_delete_mutex_win32_error_releasing(self): + mutex = self.id() + # agent.request.form["mutex"] = mutex + response = app.test_client().post(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + + # inject a previously acquired mutex + hndl_mutex_mock = mock.MagicMock() + agent.agent_mutexes[mutex] = hndl_mutex_mock + + # mock mutex fails to be released + release_mutex_mock = mock.MagicMock() + mock_error = mock.MagicMock() + release_mutex_mock.return_value = None, mock_error + agent.release_mutex = release_mutex_mock + + # response = agent.delete_mutex() + response = app.test_client().delete(f"{BASE_URL}/mutex", data={"mutex": mutex}) + assert response is mock_error + + def test_delete_mutex_win32_200(self): + mutex = self.id() + # agent.request.form["mutex"] = mutex + response = app.test_client().post(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + + # inject a previously acquired mutex + hndl_mutex_mock = mock.MagicMock() + agent.agent_mutexes[mutex] = hndl_mutex_mock + + # mock mutex is released + release_mutex_mock = mock.MagicMock() + release_mutex_mock.return_value = True, None + agent.release_mutex = release_mutex_mock + + # response = agent.delete_mutex() + response = app.test_client().delete(f"{BASE_URL}/mutex", data={"mutex": self.id()}) + release_mutex_mock.assert_called_once_with(hndl_mutex_mock) + # assert isinstance(response, agent.jsonify) + assert response.status_code == 200 + + +class TestAgent: + """Test the agent API.""" + + agent_process: multiprocessing.Process = None + + def setUp(self): + agent.state = {"status": agent.Status.INIT, "description": "", "async_subprocess": None} + ev = multiprocessing.Event() + self.agent_process = multiprocessing.Process( + target=agent.app.run, + kwargs={"host": HOST, "port": PORT, "event": ev}, + ) + self.agent_process.start() + + # Wait for http server to start. + # if not ev.wait(5.0): + # raise Exception("Failed to start agent HTTP server") + + # Create temp directory for tests, as makes tidying up easier + os.mkdir(DIRPATH, 0o777) + assert os.path.isdir(DIRPATH) + + def tearDown(self): + # Remove the temporary directory and files. + try: + # Test the kill endpoint, which shuts down the agent service. + r = app.test_client().get(f"{BASE_URL}/kill") + assert r.status_code == 200 + assert r.json()["message"] == "Quit the CAPE Agent" + except requests.exceptions.ConnectionError: + pass + shutil.rmtree(DIRPATH, ignore_errors=True) + assert not os.path.isdir(DIRPATH) + + # Ensure agent process completes; release resources. + self.agent_process.join() + self.agent_process.close() + + @staticmethod + def non_existent_directory(): + root = pathlib.Path(tempfile.gettempdir()).root + current_pid = os.getpid() + non_existent = pathlib.Path(root, str(current_pid), str(random.randint(10000, 99999))) + assert not os.path.isdir(non_existent) + assert not os.path.exists(non_existent) + return non_existent + + @staticmethod + def confirm_status(expected_status): + """Do a get and check the status.""" + status_url = urljoin(BASE_URL, "status") + r = app.test_client().get(status_url) + print(r, r.json) + js = r.json + assert js["message"] == "Analysis status" + assert js["status"] == expected_status + assert r.status_code == 200 + return js + + @staticmethod + def create_file(path, contents): + """Create the named file with the supplied contents.""" + with open(path, "w") as file: + file.write(contents) + assert os.path.exists(path) + assert os.path.isfile(path) + + @staticmethod + def file_contains(path, expected_contents): + """Examine the contents of a file.""" + with open(path) as file: + actual_contents = file.read() + return bool(expected_contents in actual_contents) + + @classmethod + def store_file(cls, tmp, file_contents): + """Store a file via the API, with the given contents. Return the filepath.""" + contents = os.linesep.join(file_contents) + tmp.write(contents) + tmp.seek(0) + upload_file = {"file": tmp.name} + # filepath = os.path.join(DIRPATH, make_temp_name() + ".py") + filepath = tmp.name + form = {"filepath": filepath} + js = cls.post_form("store", form, files=upload_file) + assert js["message"] == "Successfully stored file" + assert os.path.isfile(filepath) + assert cls.file_contains(filepath, file_contents[0]) + assert cls.file_contains(filepath, file_contents[-1]) + return filepath + + @staticmethod + def post_form(url_part, form_data={}, expected_status=200, files=None): + """Post to the URL and return the json.""" + url = urljoin(BASE_URL, url_part) + if files: + form_data.update(files) + # r = requests.post(url, data=form_data, files=files) + r = app.test_client().post(url, data=form_data) + assert r.status_code == expected_status + js = r.json + return js + + def test_root(self): + r = app.test_client().get(f"{BASE_URL}/") + assert r.status_code == 200 + js = r.json + assert js["message"] == "CAPE Agent!" + assert "version" in js + assert "features" in js + assert "execute" in js["features"] + assert "execpy" in js["features"] + assert "pinning" in js["features"] + + def test_status_write_valid_text(self): + """Write a status of 'exception'.""" + # First, confirm the status is NOT 'exception'. + _ = self.confirm_status(str(agent.Status.INIT)) + form = {"status": "exception"} + url_part = "status" + _ = self.post_form(url_part, form) + _ = self.confirm_status(str(agent.Status.EXCEPTION)) + + def test_status_write_valid_number(self): + """Write a status of '5'.""" + # First, confirm the status is NOT 'exception'. + _ = self.confirm_status(str(agent.Status.INIT)) + form = {"status": 5} + url_part = "status" + _ = self.post_form(url_part, form) + _ = self.confirm_status(str(agent.Status.EXCEPTION)) + + def test_status_write_invalid(self): + """Fail to provide a valid status.""" + form = {"description": "Test Status"} + js = self.post_form("status", form, 400) + assert js["message"] == "No valid status has been provided" + + form = {"status": "unexpected value"} + js = self.post_form("status", form, 400) + assert js["message"] == "No valid status has been provided" + _ = self.confirm_status(str(agent.Status.INIT)) + + # Write an unexpected random number. + form = {"status": random.randint(50, 99)} + js = self.post_form("status", form, 400) + assert js["message"] == "No valid status has been provided" + _ = self.confirm_status(str(agent.Status.INIT)) + + def test_logs(self): + """Test that the agent responds to a request for the logs.""" + # r = app.test_client().get(f"{BASE_URL}/logs") + r = app.test_client().get(f"{BASE_URL}/logs") + assert r.status_code == 200 + js = r.json + assert js["message"] == "Agent logs" + assert "stdout" in js + assert "stderr" in js + + def test_system(self): + """Test that the agent responds to a request for the system/platform.""" + # r = app.test_client().get(f"{BASE_URL}/system") + r = app.test_client().get(f"{BASE_URL}/system") + assert r.status_code == 200 + js = r.json + assert js["message"] == "System" + assert "system" in js + if sys.platform == "win32": + assert js["system"] == "Windows" + else: + assert js["system"] == "Linux" + + def test_environ(self): + """Test that the agent responds to a request for the environment.""" + r = app.test_client().get(f"{BASE_URL}/environ") + assert r.status_code == 200 + js = r.json + assert js["message"] == "Environment variables" + assert "environ" in js + + def test_path(self): + """Test that the agent responds to a request for its path.""" + r = app.test_client().get(f"{BASE_URL}/path") + assert r.status_code == 200 + js = r.json + assert js["message"] == "Agent path" + assert "filepath" in js + assert os.path.isfile(js["filepath"]) + + def test_mkdir_valid(self): + """Test that the agent creates a directory.""" + new_dir = os.path.join(DIRPATH, make_temp_name()) + form = { + "dirpath": new_dir, + "mode": 0o777, + } + js = self.post_form("mkdir", form) + assert js["message"] == "Successfully created directory" + assert os.path.exists(new_dir) + assert os.path.isdir(new_dir) + + def test_mkdir_missing(self): + """Ensure we get an error returned when the mkdir request fails.""" + form = {} + js = self.post_form("mkdir", form, 400) + assert js["message"] == "No dirpath has been provided" + + @pytest.mark.skip("Not many paths are actually invalid") + def test_mkdir_invalid(self): + """Ensure we get an error returned when the mkdir request fails.""" + # TODO come up with an invalid directory path for windows / linux + invalid = "" + form = {"dirpath": invalid, "mode": 0o777} + js = self.post_form("mkdir", form, 500) + assert js["message"] == "Error creating directory" + + def test_mktemp_valid(self): + form = { + "dirpath": DIRPATH, + "prefix": make_temp_name(), + "suffix": "tmp", + } + js = self.post_form("mktemp", form) + assert js["message"] == "Successfully created temporary file" + # tempfile.mkstemp adds random characters to suffix, so returned name + # will be different + assert "filepath" in js and js["filepath"].startswith(os.path.join(form["dirpath"], form["prefix"])) + assert os.path.exists(js["filepath"]) + assert os.path.isfile(js["filepath"]) + + def test_mktemp_invalid(self): + """Ensure we get an error returned when the mktemp request fails.""" + dirpath = self.non_existent_directory() + form = { + "dirpath": dirpath, + "prefix": "", + "suffix": "", + } + js = self.post_form("mktemp", form, 500) + assert js["message"] == "Error creating temporary file" + + def test_mkdtemp_valid(self): + """Ensure we can use the mkdtemp endpoint.""" + form = { + "dirpath": DIRPATH, + "prefix": make_temp_name(), + "suffix": "tmp", + } + js = self.post_form("mkdtemp", form) + assert js["message"] == "Successfully created temporary directory" + # tempfile.mkdtemp adds random characters to suffix, so returned name + # will be different + assert "dirpath" in js and js["dirpath"].startswith(os.path.join(form["dirpath"], form["prefix"])) + assert os.path.exists(js["dirpath"]) + assert os.path.isdir(js["dirpath"]) + + def test_mkdtemp_invalid(self): + """Ensure we get an error returned when the mkdtemp request fails.""" + dirpath = self.non_existent_directory() + assert not dirpath.exists() + form = { + "dirpath": dirpath, + "prefix": "", + "suffix": "", + } + js = self.post_form("mkdtemp", form, 500) + assert js["message"] == "Error creating temporary directory" + + def test_store(self): + sample_text = make_temp_name() + + with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp: + tmp.write(os.linesep.join(("test data", sample_text, "test data"))) + form = {"filepath": tmp.name, "file": tmp.name} + + js = self.post_form("store", form) + assert js["message"] == "Successfully stored file" + assert os.path.exists(form["filepath"]) + assert os.path.isfile(form["filepath"]) + assert self.file_contains(form["filepath"], sample_text) + + def test_store_invalid(self): + # missing file + form = {"filepath": os.path.join(DIRPATH, make_temp_name() + ".tmp")} + js = self.post_form("store", form, 400) + assert js["message"] == "No file has been provided" + + # missing filepath + with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp: + tmp.write("test data\ntest data\n") + upload_file = {"file": tmp.name} + js = self.post_form("store", {}, 400, files=upload_file) + assert js["message"] == "No filepath has been provided" + + # destination file path is invalid + with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp: + tmp.write("test data\ntest data\n") + upload_file = {"file": tmp.name} + form = {"filepath": os.path.join(DIRPATH, make_temp_name(), "tmp")} + js = self.post_form("store", form, 500, files=upload_file) + assert js["message"].startswith("Error storing file") + + def test_retrieve(self): + """Create a file, then try to retrieve it.""" + first_line = make_temp_name() + last_line = make_temp_name() + file_contents = os.linesep.join((first_line, "test data", last_line)) + # file_path = os.path.join(DIRPATH, make_temp_name() + ".tmp") + with tempfile.NamedTemporaryFile(delete=False) as tmp: + self.create_file(tmp.name, file_contents) + form = {"filepath": tmp.name} + # Can't use self.post_form here as no json will be returned. + r = app.test_client().post(f"{BASE_URL}/retrieve", data=form) + # assert r.status_code == 200 + assert file_contents in r.data + # assert last_line.encode() in r.data + # Also test the base64-encoded retrieval. + """ToDo base64 not supported yet + form["encoding"] = "base64" + r = app.test_client().post(f"{BASE_URL}/retrieve", data=form) + # assert r.status_code == 200 + decoded = base64.b64decode(r.data + b"==").decode() + assert "test data" in decoded + assert first_line in decoded + assert last_line in decoded + """ + + def test_retrieve_invalid(self): + js = self.post_form("retrieve", {}, 400) + assert js["message"].startswith("No filepath has been provided") + + # request to retrieve non existent file + form = {"filepath": os.path.join(DIRPATH, make_temp_name() + ".tmp")} + # Can't use self.post_form here as no json will be returned. + r = app.test_client().post(f"{BASE_URL}/retrieve", data=form) + assert r.status_code == 404 + + def test_extract(self): + """Create a file zip file, then upload and extract the contents.""" + file_dir = make_temp_name() + file_name = make_temp_name() + file_contents = make_temp_name() + zfile = io.BytesIO() + zf = zipfile.ZipFile(zfile, "w", zipfile.ZIP_DEFLATED, False) + zf.writestr(os.path.join(file_dir, file_name), file_contents) + zf.close() + zfile.seek(0) + + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp.write(zfile.read()) + upload_file = {"zipfile": tmp.name} + form = {"dirpath": DIRPATH} + + js = self.post_form("extract", form, files=upload_file) + assert js["message"] == "Successfully extracted zip file" + expected_path = os.path.join(DIRPATH, file_dir, file_name) + assert os.path.exists(expected_path) + assert self.file_contains(expected_path, file_contents) + + # todo should I check the filesytem for the file? + + def test_extract_invalid(self): + form = {"dirpath": DIRPATH} + js = self.post_form("extract", form, 400) + assert js["message"] == "No zip file has been provided" + + with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp: + tmp.write("dummy data") + upload_file = {"zipfile": tmp.name} + js = self.post_form("extract", {}, 400, files=upload_file) + assert js["message"] == "No dirpath has been provided" + + def test_remove(self): + tempdir = os.path.join(DIRPATH, make_temp_name()) + tempfile = os.path.join(tempdir, make_temp_name()) + os.mkdir(tempdir, 0o777) + self.create_file(tempfile, "test data\ntest data\n") + + # delete temp file + form = {"path": tempfile} + js = self.post_form("remove", form) + assert js["message"] == "Successfully deleted file" + + # delete temp directory + form = {"path": tempdir} + js = self.post_form("remove", form) + assert js["message"] == "Successfully deleted directory" + + def test_remove_invalid(self): + tempdir = os.path.join(DIRPATH, make_temp_name()) + + # missing parameter + form = {} + js = self.post_form("remove", form, 400) + assert js["message"] == "No path has been provided" + + # path doesn't exist + form = {"path": tempdir} + js = self.post_form("remove", form, 404) + assert js["message"] == "Path provided does not exist" + + @pytest.mark.skipif(agent.isAdmin(), reason="Test fails if privileges are elevated.") + def test_remove_system_temp_dir(self): + # error removing file or dir (permission) + form = {"path": tempfile.gettempdir()} + js = self.post_form("remove", form, 500) + assert js["message"] == "Error removing file or directory" + + def test_async_running(self): + """Test async execution shows as running after starting.""" + # upload test python file + file_contents = ( + f"# Comment a random number {random.randint(1000, 9999)}'", + "import sys", + "import time", + "print('hello world')", + "print('goodbye world', file=sys.stderr)", + "time.sleep(1)", + "sys.exit(0)", + ) + with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp: + filepath = self.store_file(tmp, file_contents) + form = {"filepath": filepath, "async": 1} + js = self.post_form("execpy", form) + assert js["message"] == "Successfully spawned command" + assert "stdout" not in js + assert "stderr" not in js + assert "process_id" in js + _ = self.confirm_status(str(agent.Status.RUNNING)) + + def test_async_complete(self): + """Test async execution shows as complete after exiting.""" + # upload test python file + file_contents = ( + f"# Comment a random number {random.randint(1000, 9999)}'", + "import sys", + "print('hello world')", + "sys.exit(0)", + ) + with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp: + filepath = self.store_file(tmp, file_contents) + form = {"filepath": filepath, "async": 1} + + js = self.post_form("execpy", form) + assert js["message"] == "Successfully spawned command" + # sleep a moment to let it finish + time.sleep(1) + _ = self.confirm_status(str(agent.Status.COMPLETE)) + + def test_async_failure(self): + """Test that an unsuccessful script gets a status of 'failed'.""" + # upload test python file. It will sleep, then try to import a nonexistent module. + file_contents = ( + f"# Comment a random number {random.randint(1000, 9999)}'", + "import sys", + "import time", + "time.sleep(1)", + "import nonexistent", + "print('hello world')", + "print('goodbye world', file=sys.stderr)", + "sys.exit(0)", + ) + + with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp: + filepath = self.store_file(tmp, file_contents) + form = {"filepath": filepath, "async": 1} + + js = self.post_form("execpy", form) + assert js["message"] == "Successfully spawned command" + assert "stdout" not in js + assert "stderr" not in js + assert "process_id" in js + js = self.confirm_status(str(agent.Status.RUNNING)) + assert "process_id" in js + time.sleep(2) + + js = self.confirm_status(str(agent.Status.FAILED)) + assert "process_id" not in js + + def test_execute(self): + """Test executing the 'date' command.""" + if sys.platform == "win32": + form = {"command": "cmd /c date /t"} + else: + form = {"command": "date"} + js = self.post_form("execute", form) + assert js["message"] == "Successfully executed command" + assert "stdout" in js + assert "stderr" in js + current_year = datetime.date.today().isoformat() + assert current_year[:4] in js["stdout"] + + def test_execute_error(self): + """Expect an error on invalid command to execute.""" + js = self.post_form("execute", {}, 400) + assert js["message"] == "No command has been provided" + + form = {"command": "ls"} + js = self.post_form("execute", form, 500) + assert js["message"] == "Not allowed to execute commands" + + def test_execute_py(self): + """Test we can execute a simple python script.""" + # The output line endings are different between linux and Windows. + file_contents = ( + f"# Comment a random number {random.randint(1000, 9999)}'", + "import sys", + "print('hello world')", + "print('goodbye world', file=sys.stderr)", + ) + with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp: + filepath = self.store_file(tmp, file_contents) + + form = {"filepath": filepath} + js = self.post_form("execpy", form) + assert js["message"] == "Successfully executed command" + assert "stdout" in js and "hello world" in js["stdout"] + assert "stderr" in js and "goodbye world" in js["stderr"] + + def test_execute_py_error_no_file(self): + """Ensure we get a 400 back when there's no file provided.""" + # The agent used to return 200 even in various failure scenarios. + js = self.post_form("execpy", {}, expected_status=400) + assert js["message"] == "No Python file has been provided" + + def test_execute_py_error_nonexistent_file(self): + """Ensure we get a 400 back when a nonexistent filename is provided.""" + filepath = os.path.join(DIRPATH, make_temp_name() + ".py") + form = {"filepath": filepath} + js = self.post_form("execpy", form, expected_status=400) + assert js["message"] == "Error executing python command." + assert "stderr" in js and "No such file or directory" in js["stderr"] + _ = self.confirm_status(str(agent.Status.FAILED)) + + def test_execute_py_error_non_zero_exit_code(self): + """Ensure we get a 400 back when there's a non-zero exit code.""" + # Run a python script that exits non-zero. + file_contents = ( + f"# Comment a random number {random.randint(1000, 9999)}'", + "import sys", + "print('hello world')", + "sys.exit(3)", + ) + with tempfile.NamedTemporaryFile(delete=False, mode='w') as tmp: + filepath = self.store_file(tmp, file_contents) + form = {"filepath": filepath} + js = self.post_form("execpy", form, expected_status=400) + assert js["message"] == "Error executing python command." + assert "hello world" in js["stdout"] + _ = self.confirm_status(str(agent.Status.FAILED)) + + def test_pinning(self): + r = app.test_client().get(f"{BASE_URL}/pinning") + assert r.status_code == 200 + js = r.json + assert js["message"] == "Successfully pinned Agent" + assert "client_ip" in js + + # Pinning again causes an error. + r = app.test_client().get(f"{BASE_URL}/pinning") + assert r.status_code == 500 + js = r.json + assert js["message"] == "Agent has already been pinned to an IP!"