ビットコインの仕組みをpythonで実装してみる #4 「Networkの形成、マイニングによるブロック作成(前編)」

スポンサーリンク
Uncategorized

はじめに

前回までで、トランザクションのやり取りまでデモを実施しました。

複数ノードがHTTPで通信し、トランザクションを共有し、PoWでブロックをマイニングするミニBitcoinを試してみたいと思います。

以前の記事で全体像を載せましたが、今回の記事では以下の部分を確認してみます。

まずはコードの全体像

コードの全体像は以下のようになっています。(各関数については後述しますので、軽く読み飛ばしてしまってください。)

import argparse
import hashlib
import json
import threading
import time
import uuid
import urllib.request
from dataclasses import dataclass, asdict
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import List, Dict, Any

# 現在時刻を「HH:MM:SS」形式の文字列で返す
def now_ts() -> str:
	return time.strftime("%H:%M:%S")

def now_ms() -> float:
	return time.perf_counter() * 1000.0

# ノード名付きでログを標準出力に出す
def log(node: str, message: str) -> None:
	print(f"[{now_ts()}] {node}: {message}", flush=True)

# コマンドラインから渡されたカンマ区切りのピア一覧文字列をURLリストに変換する
def parse_peers(peers_raw: str) -> list[str]:
	if not peers_raw:
		return []
	parts = [p.strip() for p in peers_raw.split(",") if p.strip()]
	return parts

# 指定URLにJSONをPOSTする簡易ヘルパー
def post_json(url: str, payload: dict, timeout: float = 2.0) -> None:
	start_ms = now_ms()
	data = json.dumps(payload).encode("utf-8")
	req = urllib.request.Request(
		url, data=data, headers={"Content-Type": "application/json"}
	)
	with urllib.request.urlopen(req, timeout=timeout) as _:
		_ = None
	end_ms = now_ms()
	if payload.get("origin"):
		log(payload.get("origin"), f"post_json -> {url} in {end_ms - start_ms:.2f} ms")
	return None

# 指定URLからJSONレスポンスを取得する簡易ヘルパー
def get_json(url: str, timeout: float = 2.0) -> dict:
	start_ms = now_ms()
	with urllib.request.urlopen(url, timeout=timeout) as resp:
		data = json.loads(resp.read().decode("utf-8"))
	end_ms = now_ms()
	log("client", f"get_json <- {url} in {end_ms - start_ms:.2f} ms")
	return data

# Bitcoinと同様に2回SHA256をかけたハッシュ値(16進文字列)を返す
def sha256d(data: bytes) -> str:
	return hashlib.sha256(hashlib.sha256(data).digest()).hexdigest()

# トランザクションIDのリストからメルクルルートを計算する
def compute_merkle_root(txs: List[Dict[str, Any]]) -> str:
	start_ms = now_ms()
	if not txs:
		root = "0" * 64
		log("merkle", f"computed merkle root (empty) in {now_ms() - start_ms:.2f} ms")
		return root
	hashes = [tx["txid"] for tx in txs]
	while len(hashes) > 1:
		if len(hashes) % 2 == 1:
			hashes.append(hashes[-1])
		new_hashes = []
		for i in range(0, len(hashes), 2):
			pair = (hashes[i] + hashes[i + 1]).encode("utf-8")
			new_hashes.append(sha256d(pair))
		hashes = new_hashes
	root = hashes[0]
	log("merkle", f"computed merkle root for {len(txs)} txs in {now_ms() - start_ms:.2f} ms")
	return root

@dataclass
class Block:
	index: int
	prev_hash: str
	timestamp: float
	nonce: int
	difficulty: int
	merkle_root: str
	txs: List[Dict[str, Any]]
	hash: str


# ブロックヘッダ相当の情報を1本のバイト列にシリアライズする
def block_header_bytes(block: "Block") -> bytes:
	parts = [
		str(block.index),
		block.prev_hash,
		str(block.timestamp),
		str(block.nonce),
		str(block.difficulty),
		block.merkle_root,
	]
	return "|".join(parts).encode("utf-8")


def format_block_summary(block: "Block") -> str:
	return (
		f"latest block index={block.index} "
		f"hash={block.hash[:8]}... "
		f"prev={block.prev_hash[:8]}... "
		f"txs={len(block.txs)} nonce={block.nonce} "
		f"merkle={block.merkle_root[:8]}..."
	)


# 指定difficultyを満たすブロックハッシュが見つかるまでナンスを変えながら探索する
def mine_block(index: int, prev_hash: str, txs: List[Dict[str, Any]], difficulty: int) -> Block:
	start_ms = now_ms()
	merkle_root = compute_merkle_root(txs)
	nonce = 0
	target_prefix = "0" * difficulty
	while True:
		timestamp = time.time()
		head = Block(
			index=index,
			prev_hash=prev_hash,
			timestamp=timestamp,
			nonce=nonce,
			difficulty=difficulty,
			merkle_root=merkle_root,
			txs=txs,
			hash="",
		)
		bh = block_header_bytes(head)
		block_hash = sha256d(bh)
		if block_hash.startswith(target_prefix):
			elapsed_ms = now_ms() - start_ms
			log("miner", f"mine_block success nonce={nonce} in {elapsed_ms:.2f} ms")
			return Block(
				index=index,
				prev_hash=prev_hash,
				timestamp=timestamp,
				nonce=nonce,
				difficulty=difficulty,
				merkle_root=merkle_root,
				txs=txs,
				hash=block_hash,
			)
		nonce += 1
		if nonce % 10000 == 0:
			# 一定回数ごとに中断して「別ノードから新しいブロックが届いた場合」に備える
			elapsed_ms = now_ms() - start_ms
			log("miner", f"mine_block pause nonce={nonce} in {elapsed_ms:.2f} ms")
			return None

class BlockchainNodeState:
	def __init__(self, name: str, peers: list[str], difficulty: int = 4):
		self.name = name
		self.peers = peers
		self.difficulty = difficulty
		self.mempool: list[dict] = []
		self.seen_txids: set[str] = set()
		self.chain: list[Block] = []
		self.lock = threading.Lock()
		self.mining = False
		self.mempool_first_seen_ms: float | None = None

	# 最初の1ブロック(ジェネシスブロック)を作成する
	def init_genesis(self) -> None:
		with self.lock:
			if self.chain:
				return
			genesis_tx = {
				"txid": "GENESIS",
				"origin": self.name,
				"from": "network",
				"to": self.name,
				"amount": 0,
				"fee": 0,
				"ttl": 0,
				"ts": now_ts(),
			}
			block = Block(
				index=0,
				prev_hash="0" * 64,
				timestamp=time.time(),
				nonce=0,
				difficulty=self.difficulty,
				merkle_root=compute_merkle_root([genesis_tx]),
				txs=[genesis_tx],
				hash="GENESIS",
			)
			self.chain.append(block)
			self.seen_txids.add("GENESIS")
			log(self.name, "created genesis block")

	# 現在のチェーン先頭(最新ブロック)を返す
	def tip(self) -> Block:
		with self.lock:
			return self.chain[-1]

	# 新しいトランザクションをメモリプールに追加する(重複は無視)
	def add_tx(self, tx: dict) -> bool:
		txid = tx.get("txid")
		if not txid:
			return False
		with self.lock:
			if txid in self.seen_txids:
				return False
			if not self.mempool:
				self.mempool_first_seen_ms = now_ms()
			self.seen_txids.add(txid)
			self.mempool.append(tx)
			return True

	# メモリプールから最大max_count件のトランザクションを取り出す
	def take_mempool_batch(self, max_count: int = 10) -> list[dict]:
		with self.lock:
			if not self.mempool:
				return []
			batch = self.mempool[:max_count]
			self.mempool = self.mempool[max_count:]
			return batch

	# 新しいブロックをチェーンの末尾に追加する(prev_hashが一致しないと拒否)
	def add_block(self, block: Block) -> bool:
		with self.lock:
			if not self.chain:
				# first block must be genesis-like
				self.chain.append(block)
				log(self.name, format_block_summary(block))
				return True
			current_tip = self.chain[-1]
			if block.prev_hash != current_tip.hash:
				return False
			self.chain.append(block)
			# remove included txs from mempool
			included = {tx["txid"] for tx in block.txs if "txid" in tx}
			self.mempool = [tx for tx in self.mempool if tx.get("txid") not in included]
			log(self.name, format_block_summary(block))
			self.mempool_first_seen_ms = None
			return True

	# APIレスポンス用に現在の状態をディクショナリで返す
	def snapshot(self) -> dict:
		with self.lock:
			return {
				"name": self.name,
				"peers": list(self.peers),
				"mempool": list(self.mempool),
				"chain": [asdict(b) for b in self.chain],
			}

# 受信したトランザクションをTTLが0になるまでピアに転送する
def forward_tx(state: BlockchainNodeState, tx: dict) -> None:
	ttl = tx.get("ttl", 0)
	if ttl <= 0:
		return
	next_tx = dict(tx)
	next_tx["ttl"] = ttl - 1
	for peer in state.peers:
		try:
			start_ms = now_ms()
			post_json(f"{peer}/tx", next_tx)
			log(state.name, f"forward_tx -> {peer} in {now_ms() - start_ms:.2f} ms")
		except Exception:
			continue

# 受信または生成したブロックをすべてのピアにブロードキャストする
def forward_block(state: BlockchainNodeState, block: Block) -> None:
	payload = asdict(block)
	for peer in state.peers:
		try:
			start_ms = now_ms()
			post_json(f"{peer}/block", payload)
			log(state.name, f"forward_block -> {peer} in {now_ms() - start_ms:.2f} ms")
		except Exception:
			continue


# 各エンドポイントに対応するHTTPハンドラ(GET/POST)を生成する
def make_handler(state: BlockchainNodeState):
	class Handler(BaseHTTPRequestHandler):
		def do_GET(self) -> None:
			if self.path == "/mempool":
				body = json.dumps(state.snapshot()["mempool"]).encode("utf-8")
				self.send_response(200)
				self.send_header("Content-Type", "application/json")
				self.send_header("Content-Length", str(len(body)))
				self.end_headers()
				try:
					self.wfile.write(body)
				except BrokenPipeError:
					return
				return
			if self.path == "/chain":
				body = json.dumps(state.snapshot()["chain"]).encode("utf-8")
				self.send_response(200)
				self.send_header("Content-Type", "application/json")
				self.send_header("Content-Length", str(len(body)))
				self.end_headers()
				try:
					self.wfile.write(body)
				except BrokenPipeError:
					return
				return
			self.send_response(404)
			self.end_headers()

		def do_POST(self) -> None:
			if self.path == "/tx":
				start_ms = now_ms()
				length = int(self.headers.get("Content-Length", "0"))
				raw = self.rfile.read(length) if length else b"{}"
				tx = json.loads(raw.decode("utf-8"))
				added = state.add_tx(tx)
				if added:
					log(state.name, f"received txid={tx['txid']} from={tx.get('origin')}")
					forward_tx(state, tx)
				self.send_response(200)
				try:
					self.end_headers()
				except BrokenPipeError:
					return
				log(state.name, f"handle /tx in {now_ms() - start_ms:.2f} ms")
				return
			if self.path == "/block":
				start_ms = now_ms()
				length = int(self.headers.get("Content-Length", "0"))
				raw = self.rfile.read(length) if length else b"{}"
				b = json.loads(raw.decode("utf-8"))
				try:
					block = Block(**b)
				except TypeError:
					self.send_response(400)
					self.end_headers()
					return
				ok = state.add_block(block)
				if ok:
					log(state.name, f"accepted block height={block.index} hash={block.hash[:8]}...")
					forward_block(state, block)
				self.send_response(200)
				try:
					self.end_headers()
				except BrokenPipeError:
					return
				log(state.name, f"handle /block in {now_ms() - start_ms:.2f} ms")
				return
			self.send_response(404)
			self.end_headers()

		def log_message(self, format: str, *args) -> None:
			return

	return Handler

# メモリプールのトランザクションを定期的に取り出し、ブロックをマイニングするループを開始する
def start_mining_loop(state: BlockchainNodeState, max_txs_per_block: int = 10, interval: float = 1.0) -> threading.Thread:
	def loop() -> None:
		state.mining = True
		log(state.name, "mining loop started")
		while True:
			loop_start_ms = now_ms()
			batch = state.take_mempool_batch(max_txs_per_block)
			if not batch:
				time.sleep(interval)
				continue
			with state.lock:
				index = len(state.chain)
				prev_hash = state.chain[-1].hash if state.chain else "0" * 64
			result = mine_block(index, prev_hash, batch, state.difficulty)
			if result is None:
				# aborted due to nonce threshold; re-queue txs
				with state.lock:
					state.mempool = batch + state.mempool
				log(state.name, f"mining loop aborted batch in {now_ms() - loop_start_ms:.2f} ms")
				continue
			added = state.add_block(result)
			if added:
				log(state.name, f"mined new block height={result.index} hash={result.hash[:8]}...")
				forward_block(state, result)
				log(state.name, f"mining loop complete in {now_ms() - loop_start_ms:.2f} ms")
				with state.lock:
					start_ms = state.mempool_first_seen_ms
				if start_ms is not None:
					log(state.name, f"end-to-end from first tx to block in {now_ms() - start_ms:.2f} ms")

	t = threading.Thread(target=loop, daemon=True)
	t.start()
	return t

# 単一ノードを起動し、必要ならマイニングループも開始する
def run_node(name: str, port: int, peers: list[str], miner: bool, difficulty: int) -> None:
	state = BlockchainNodeState(name=name, peers=peers, difficulty=difficulty)
	state.init_genesis()
	server = ThreadingHTTPServer(("0.0.0.0", port), make_handler(state))
	log(name, f"listening on 0.0.0.0:{port}, miner={miner}")
	if miner:
		start_mining_loop(state)
	server.serve_forever()

# コマンドライン引数をパースしてrun_nodeを呼び出すエントリポイント
def main() -> None:
	parser = argparse.ArgumentParser(description="Simple Bitcoin-like blockchain network demo")
	parser.add_argument("--name", default="node", help="node name")
	parser.add_argument("--port", type=int, default=9101, help="listen port")
	parser.add_argument("--peers", default="", help="comma-separated peer base URLs (e.g. http://node02:9102)")
	parser.add_argument("--miner", action="store_true", help="enable mining loop")
	parser.add_argument("--difficulty", type=int, default=4, help="POW difficulty (number of leading zeros)")
	args = parser.parse_args()

	peers = parse_peers(args.peers)
	run_node(args.name, args.port, peers, args.miner, args.difficulty)


if __name__ == "__main__":
	main()

以下関数は処理のサポート用の関数なので、上記コメントアウトの内容通りです。

  • now_ts
  • now_ms
  • log
  • parse_peers
  • post_json
  • get_json
  • sha256d

メルクルルート計算

メルクルルートはブロックチェーンでブロックに入っている全トランザクションを1つのハッシュにまとめる仕組みのことを指します。

def compute_merkle_root(txs: List[Dict[str, Any]]) -> str:
	start_ms = now_ms()
	if not txs:
		root = "0" * 64
		log("merkle", f"computed merkle root (empty) in {now_ms() - start_ms:.2f} ms")
		return root
	hashes = [tx["txid"] for tx in txs]
	while len(hashes) > 1:
		if len(hashes) % 2 == 1:
			hashes.append(hashes[-1])
		new_hashes = []
		for i in range(0, len(hashes), 2):
			pair = (hashes[i] + hashes[i + 1]).encode("utf-8")
			new_hashes.append(sha256d(pair))
		hashes = new_hashes
	root = hashes[0]
	log("merkle", f"computed merkle root for {len(txs)} txs in {now_ms() - start_ms:.2f} ms")
	return root

ハッシュ化したトランザクションをまとめて、メルクルツリーにします。

以下がイメージ図です。

merkle_root = hash(h12 + h34)

        merkle_root
         /      \
      h12        h34
     /  \       /  \
   h1   h2    h3   h4

メルクルツリーは検証を高速にするために行っています。

トランザクション全部
↓
1つのハッシュに圧縮
↓
merkle_root

としたmerkle_rootをブロックには入れます。

トランザクションが改ざんされた場合、ブロックのハッシュもすべて変わるため改ざんがばれることになります。

今回のコードではメルクルルートはrootという値で返り値として返されています。

マイニング(PoW)

マイニングのコードはmine_block関数に記載しています。

def mine_block(index: int, prev_hash: str, txs: List[Dict[str, Any]], difficulty: int) -> Block:
	start_ms = now_ms()
	merkle_root = compute_merkle_root(txs)
	nonce = 0
	target_prefix = "0" * difficulty
	while True:
		timestamp = time.time()
		head = Block(
			index=index,
			prev_hash=prev_hash,
			timestamp=timestamp,
			nonce=nonce,
			difficulty=difficulty,
			merkle_root=merkle_root,
			txs=txs,
			hash="",
		)
		bh = block_header_bytes(head)
		block_hash = sha256d(bh)
		if block_hash.startswith(target_prefix):
			elapsed_ms = now_ms() - start_ms
			log("miner", f"mine_block success nonce={nonce} in {elapsed_ms:.2f} ms")
			return Block(
				index=index,
				prev_hash=prev_hash,
				timestamp=timestamp,
				nonce=nonce,
				difficulty=difficulty,
				merkle_root=merkle_root,
				txs=txs,
				hash=block_hash,
			)
		nonce += 1
		if nonce % 10000 == 0:
			# 一定回数ごとに中断して「別ノードから新しいブロックが届いた場合」に備える
			elapsed_ms = now_ms() - start_ms
			log("miner", f"mine_block pause nonce={nonce} in {elapsed_ms:.2f} ms")
			return None

簡易版Proof of Work(PoW)を実装しています。

PoW(Proof of Work)とは

nonce(number used once)という一度だけ使う数をひたすら変えながらハッシュを試して、「ちゃんと計算を大量にやったよ」という証明をすることです。

例えば、difficulty = 4の場合、「0000xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx」という0000で始まるハッシュを探します。

nonceを変える
↓
ハッシュ計算
↓
0000で始まるハッシュを探す

という流れです。

これをnanceを変えながら繰り返します。

nonce = 1 → 3f82ab...
nonce = 2 → a8d921...
nonce = 3 → 9c2231...
・・・
nonce = 452893 → 0000f92a...

という感じ

これで大変な計算をしたことが証明されます。

でも、そもそも、なんでこんな大変な計算をしたってことを証明しないといけないのでしょうか?

これにはブロックチェーンの中央管理者がいないという特徴によります。

もし、PoWがなければだれが正しいブロックを作るかがわからないです。

ノードAもノードBもノードCもブロックを作成したといってくることになりかねないですし、この時、だれを信じていいかわかりません。

そこで、ブロックを作るというコストを作ります。

PoWというコストがないと、攻撃者が大量のブロックを作成することもできてしまいます。

さらには、PoWがあることで、ブロックの改ざんがかなり難しくなります。なぜなら、ブロックはいくつも連なっていて、一つのブロックを変えるとそれ以外のブロックも作り直さないといけません。

しかしPoWがあることで、作り直す度にPoWをしなくてはいけなくなり、世界のマイナー全員の計算量を超える必要があります。

最後に

少しずつですがブロックチェーンの理解が進んできました。

PoWはサトシナカモトの論文の中でも一番のポイントなので押さえておきたいところです。

タイトルとURLをコピーしました