はじめに
前回までで、トランザクションのやり取りまでデモを実施しました。
複数ノードがHTTPで通信し、トランザクションを共有し、PoWでブロックをマイニングするミニBitcoinを試してみたいと思います。
以前の記事で全体像を載せましたが、今回の記事では以下の部分を確認してみます。

まずはコードの全体像
コードの全体像は以下のようになっています。(各関数については後述しますので、軽く読み飛ばしてしまってください。)
import argparse
import hashlib
import json
import threading
import time
import uuid
import urllib.request
from dataclasses import dataclass, asdict
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import List, Dict, Any
# 現在時刻を「HH:MM:SS」形式の文字列で返す
def now_ts() -> str:
return time.strftime("%H:%M:%S")
def now_ms() -> float:
return time.perf_counter() * 1000.0
# ノード名付きでログを標準出力に出す
def log(node: str, message: str) -> None:
print(f"[{now_ts()}] {node}: {message}", flush=True)
# コマンドラインから渡されたカンマ区切りのピア一覧文字列をURLリストに変換する
def parse_peers(peers_raw: str) -> list[str]:
if not peers_raw:
return []
parts = [p.strip() for p in peers_raw.split(",") if p.strip()]
return parts
# 指定URLにJSONをPOSTする簡易ヘルパー
def post_json(url: str, payload: dict, timeout: float = 2.0) -> None:
start_ms = now_ms()
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url, data=data, headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=timeout) as _:
_ = None
end_ms = now_ms()
if payload.get("origin"):
log(payload.get("origin"), f"post_json -> {url} in {end_ms - start_ms:.2f} ms")
return None
# 指定URLからJSONレスポンスを取得する簡易ヘルパー
def get_json(url: str, timeout: float = 2.0) -> dict:
start_ms = now_ms()
with urllib.request.urlopen(url, timeout=timeout) as resp:
data = json.loads(resp.read().decode("utf-8"))
end_ms = now_ms()
log("client", f"get_json <- {url} in {end_ms - start_ms:.2f} ms")
return data
# Bitcoinと同様に2回SHA256をかけたハッシュ値(16進文字列)を返す
def sha256d(data: bytes) -> str:
return hashlib.sha256(hashlib.sha256(data).digest()).hexdigest()
# トランザクションIDのリストからメルクルルートを計算する
def compute_merkle_root(txs: List[Dict[str, Any]]) -> str:
start_ms = now_ms()
if not txs:
root = "0" * 64
log("merkle", f"computed merkle root (empty) in {now_ms() - start_ms:.2f} ms")
return root
hashes = [tx["txid"] for tx in txs]
while len(hashes) > 1:
if len(hashes) % 2 == 1:
hashes.append(hashes[-1])
new_hashes = []
for i in range(0, len(hashes), 2):
pair = (hashes[i] + hashes[i + 1]).encode("utf-8")
new_hashes.append(sha256d(pair))
hashes = new_hashes
root = hashes[0]
log("merkle", f"computed merkle root for {len(txs)} txs in {now_ms() - start_ms:.2f} ms")
return root
@dataclass
class Block:
index: int
prev_hash: str
timestamp: float
nonce: int
difficulty: int
merkle_root: str
txs: List[Dict[str, Any]]
hash: str
# ブロックヘッダ相当の情報を1本のバイト列にシリアライズする
def block_header_bytes(block: "Block") -> bytes:
parts = [
str(block.index),
block.prev_hash,
str(block.timestamp),
str(block.nonce),
str(block.difficulty),
block.merkle_root,
]
return "|".join(parts).encode("utf-8")
def format_block_summary(block: "Block") -> str:
return (
f"latest block index={block.index} "
f"hash={block.hash[:8]}... "
f"prev={block.prev_hash[:8]}... "
f"txs={len(block.txs)} nonce={block.nonce} "
f"merkle={block.merkle_root[:8]}..."
)
# 指定difficultyを満たすブロックハッシュが見つかるまでナンスを変えながら探索する
def mine_block(index: int, prev_hash: str, txs: List[Dict[str, Any]], difficulty: int) -> Block:
start_ms = now_ms()
merkle_root = compute_merkle_root(txs)
nonce = 0
target_prefix = "0" * difficulty
while True:
timestamp = time.time()
head = Block(
index=index,
prev_hash=prev_hash,
timestamp=timestamp,
nonce=nonce,
difficulty=difficulty,
merkle_root=merkle_root,
txs=txs,
hash="",
)
bh = block_header_bytes(head)
block_hash = sha256d(bh)
if block_hash.startswith(target_prefix):
elapsed_ms = now_ms() - start_ms
log("miner", f"mine_block success nonce={nonce} in {elapsed_ms:.2f} ms")
return Block(
index=index,
prev_hash=prev_hash,
timestamp=timestamp,
nonce=nonce,
difficulty=difficulty,
merkle_root=merkle_root,
txs=txs,
hash=block_hash,
)
nonce += 1
if nonce % 10000 == 0:
# 一定回数ごとに中断して「別ノードから新しいブロックが届いた場合」に備える
elapsed_ms = now_ms() - start_ms
log("miner", f"mine_block pause nonce={nonce} in {elapsed_ms:.2f} ms")
return None
class BlockchainNodeState:
def __init__(self, name: str, peers: list[str], difficulty: int = 4):
self.name = name
self.peers = peers
self.difficulty = difficulty
self.mempool: list[dict] = []
self.seen_txids: set[str] = set()
self.chain: list[Block] = []
self.lock = threading.Lock()
self.mining = False
self.mempool_first_seen_ms: float | None = None
# 最初の1ブロック(ジェネシスブロック)を作成する
def init_genesis(self) -> None:
with self.lock:
if self.chain:
return
genesis_tx = {
"txid": "GENESIS",
"origin": self.name,
"from": "network",
"to": self.name,
"amount": 0,
"fee": 0,
"ttl": 0,
"ts": now_ts(),
}
block = Block(
index=0,
prev_hash="0" * 64,
timestamp=time.time(),
nonce=0,
difficulty=self.difficulty,
merkle_root=compute_merkle_root([genesis_tx]),
txs=[genesis_tx],
hash="GENESIS",
)
self.chain.append(block)
self.seen_txids.add("GENESIS")
log(self.name, "created genesis block")
# 現在のチェーン先頭(最新ブロック)を返す
def tip(self) -> Block:
with self.lock:
return self.chain[-1]
# 新しいトランザクションをメモリプールに追加する(重複は無視)
def add_tx(self, tx: dict) -> bool:
txid = tx.get("txid")
if not txid:
return False
with self.lock:
if txid in self.seen_txids:
return False
if not self.mempool:
self.mempool_first_seen_ms = now_ms()
self.seen_txids.add(txid)
self.mempool.append(tx)
return True
# メモリプールから最大max_count件のトランザクションを取り出す
def take_mempool_batch(self, max_count: int = 10) -> list[dict]:
with self.lock:
if not self.mempool:
return []
batch = self.mempool[:max_count]
self.mempool = self.mempool[max_count:]
return batch
# 新しいブロックをチェーンの末尾に追加する(prev_hashが一致しないと拒否)
def add_block(self, block: Block) -> bool:
with self.lock:
if not self.chain:
# first block must be genesis-like
self.chain.append(block)
log(self.name, format_block_summary(block))
return True
current_tip = self.chain[-1]
if block.prev_hash != current_tip.hash:
return False
self.chain.append(block)
# remove included txs from mempool
included = {tx["txid"] for tx in block.txs if "txid" in tx}
self.mempool = [tx for tx in self.mempool if tx.get("txid") not in included]
log(self.name, format_block_summary(block))
self.mempool_first_seen_ms = None
return True
# APIレスポンス用に現在の状態をディクショナリで返す
def snapshot(self) -> dict:
with self.lock:
return {
"name": self.name,
"peers": list(self.peers),
"mempool": list(self.mempool),
"chain": [asdict(b) for b in self.chain],
}
# 受信したトランザクションをTTLが0になるまでピアに転送する
def forward_tx(state: BlockchainNodeState, tx: dict) -> None:
ttl = tx.get("ttl", 0)
if ttl <= 0:
return
next_tx = dict(tx)
next_tx["ttl"] = ttl - 1
for peer in state.peers:
try:
start_ms = now_ms()
post_json(f"{peer}/tx", next_tx)
log(state.name, f"forward_tx -> {peer} in {now_ms() - start_ms:.2f} ms")
except Exception:
continue
# 受信または生成したブロックをすべてのピアにブロードキャストする
def forward_block(state: BlockchainNodeState, block: Block) -> None:
payload = asdict(block)
for peer in state.peers:
try:
start_ms = now_ms()
post_json(f"{peer}/block", payload)
log(state.name, f"forward_block -> {peer} in {now_ms() - start_ms:.2f} ms")
except Exception:
continue
# 各エンドポイントに対応するHTTPハンドラ(GET/POST)を生成する
def make_handler(state: BlockchainNodeState):
class Handler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
if self.path == "/mempool":
body = json.dumps(state.snapshot()["mempool"]).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
try:
self.wfile.write(body)
except BrokenPipeError:
return
return
if self.path == "/chain":
body = json.dumps(state.snapshot()["chain"]).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
try:
self.wfile.write(body)
except BrokenPipeError:
return
return
self.send_response(404)
self.end_headers()
def do_POST(self) -> None:
if self.path == "/tx":
start_ms = now_ms()
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length) if length else b"{}"
tx = json.loads(raw.decode("utf-8"))
added = state.add_tx(tx)
if added:
log(state.name, f"received txid={tx['txid']} from={tx.get('origin')}")
forward_tx(state, tx)
self.send_response(200)
try:
self.end_headers()
except BrokenPipeError:
return
log(state.name, f"handle /tx in {now_ms() - start_ms:.2f} ms")
return
if self.path == "/block":
start_ms = now_ms()
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length) if length else b"{}"
b = json.loads(raw.decode("utf-8"))
try:
block = Block(**b)
except TypeError:
self.send_response(400)
self.end_headers()
return
ok = state.add_block(block)
if ok:
log(state.name, f"accepted block height={block.index} hash={block.hash[:8]}...")
forward_block(state, block)
self.send_response(200)
try:
self.end_headers()
except BrokenPipeError:
return
log(state.name, f"handle /block in {now_ms() - start_ms:.2f} ms")
return
self.send_response(404)
self.end_headers()
def log_message(self, format: str, *args) -> None:
return
return Handler
# メモリプールのトランザクションを定期的に取り出し、ブロックをマイニングするループを開始する
def start_mining_loop(state: BlockchainNodeState, max_txs_per_block: int = 10, interval: float = 1.0) -> threading.Thread:
def loop() -> None:
state.mining = True
log(state.name, "mining loop started")
while True:
loop_start_ms = now_ms()
batch = state.take_mempool_batch(max_txs_per_block)
if not batch:
time.sleep(interval)
continue
with state.lock:
index = len(state.chain)
prev_hash = state.chain[-1].hash if state.chain else "0" * 64
result = mine_block(index, prev_hash, batch, state.difficulty)
if result is None:
# aborted due to nonce threshold; re-queue txs
with state.lock:
state.mempool = batch + state.mempool
log(state.name, f"mining loop aborted batch in {now_ms() - loop_start_ms:.2f} ms")
continue
added = state.add_block(result)
if added:
log(state.name, f"mined new block height={result.index} hash={result.hash[:8]}...")
forward_block(state, result)
log(state.name, f"mining loop complete in {now_ms() - loop_start_ms:.2f} ms")
with state.lock:
start_ms = state.mempool_first_seen_ms
if start_ms is not None:
log(state.name, f"end-to-end from first tx to block in {now_ms() - start_ms:.2f} ms")
t = threading.Thread(target=loop, daemon=True)
t.start()
return t
# 単一ノードを起動し、必要ならマイニングループも開始する
def run_node(name: str, port: int, peers: list[str], miner: bool, difficulty: int) -> None:
state = BlockchainNodeState(name=name, peers=peers, difficulty=difficulty)
state.init_genesis()
server = ThreadingHTTPServer(("0.0.0.0", port), make_handler(state))
log(name, f"listening on 0.0.0.0:{port}, miner={miner}")
if miner:
start_mining_loop(state)
server.serve_forever()
# コマンドライン引数をパースしてrun_nodeを呼び出すエントリポイント
def main() -> None:
parser = argparse.ArgumentParser(description="Simple Bitcoin-like blockchain network demo")
parser.add_argument("--name", default="node", help="node name")
parser.add_argument("--port", type=int, default=9101, help="listen port")
parser.add_argument("--peers", default="", help="comma-separated peer base URLs (e.g. http://node02:9102)")
parser.add_argument("--miner", action="store_true", help="enable mining loop")
parser.add_argument("--difficulty", type=int, default=4, help="POW difficulty (number of leading zeros)")
args = parser.parse_args()
peers = parse_peers(args.peers)
run_node(args.name, args.port, peers, args.miner, args.difficulty)
if __name__ == "__main__":
main()以下関数は処理のサポート用の関数なので、上記コメントアウトの内容通りです。
- now_ts
- now_ms
- log
- parse_peers
- post_json
- get_json
- sha256d
メルクルルート計算
メルクルルートはブロックチェーンでブロックに入っている全トランザクションを1つのハッシュにまとめる仕組みのことを指します。
def compute_merkle_root(txs: List[Dict[str, Any]]) -> str:
start_ms = now_ms()
if not txs:
root = "0" * 64
log("merkle", f"computed merkle root (empty) in {now_ms() - start_ms:.2f} ms")
return root
hashes = [tx["txid"] for tx in txs]
while len(hashes) > 1:
if len(hashes) % 2 == 1:
hashes.append(hashes[-1])
new_hashes = []
for i in range(0, len(hashes), 2):
pair = (hashes[i] + hashes[i + 1]).encode("utf-8")
new_hashes.append(sha256d(pair))
hashes = new_hashes
root = hashes[0]
log("merkle", f"computed merkle root for {len(txs)} txs in {now_ms() - start_ms:.2f} ms")
return rootハッシュ化したトランザクションをまとめて、メルクルツリーにします。
以下がイメージ図です。
merkle_root = hash(h12 + h34)
merkle_root
/ \
h12 h34
/ \ / \
h1 h2 h3 h4メルクルツリーは検証を高速にするために行っています。
トランザクション全部
↓
1つのハッシュに圧縮
↓
merkle_rootとしたmerkle_rootをブロックには入れます。
トランザクションが改ざんされた場合、ブロックのハッシュもすべて変わるため改ざんがばれることになります。
今回のコードではメルクルルートはrootという値で返り値として返されています。
マイニング(PoW)
マイニングのコードはmine_block関数に記載しています。
def mine_block(index: int, prev_hash: str, txs: List[Dict[str, Any]], difficulty: int) -> Block:
start_ms = now_ms()
merkle_root = compute_merkle_root(txs)
nonce = 0
target_prefix = "0" * difficulty
while True:
timestamp = time.time()
head = Block(
index=index,
prev_hash=prev_hash,
timestamp=timestamp,
nonce=nonce,
difficulty=difficulty,
merkle_root=merkle_root,
txs=txs,
hash="",
)
bh = block_header_bytes(head)
block_hash = sha256d(bh)
if block_hash.startswith(target_prefix):
elapsed_ms = now_ms() - start_ms
log("miner", f"mine_block success nonce={nonce} in {elapsed_ms:.2f} ms")
return Block(
index=index,
prev_hash=prev_hash,
timestamp=timestamp,
nonce=nonce,
difficulty=difficulty,
merkle_root=merkle_root,
txs=txs,
hash=block_hash,
)
nonce += 1
if nonce % 10000 == 0:
# 一定回数ごとに中断して「別ノードから新しいブロックが届いた場合」に備える
elapsed_ms = now_ms() - start_ms
log("miner", f"mine_block pause nonce={nonce} in {elapsed_ms:.2f} ms")
return None簡易版Proof of Work(PoW)を実装しています。
最後に
少しずつですがブロックチェーンの理解が進んできました。
PoWはサトシナカモトの論文の中でも一番のポイントなので押さえておきたいところです。
