"""
Hub protocol implementation for external draughts engines (e.g., Scan).
The Hub protocol is a text-based protocol similar to UCI (from chess),
used by engines like Scan 3.1. See protocol.txt for full specification.
Example usage:
>>> from draughts import StandardBoard
>>> from draughts.hub import HubEngine
>>>
>>> engine = HubEngine("path/to/scan.exe")
>>> engine.start()
>>>
>>> board = StandardBoard()
>>> move = engine.get_best_move(board)
>>> board.push(move)
>>>
>>> engine.quit()
Or using context manager:
>>> with HubEngine("path/to/scan.exe") as engine:
... move = engine.get_best_move(board)
"""
from __future__ import annotations
import re
import subprocess
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional, Generator, Iterable
from loguru import logger
from draughts.boards.base import BaseBoard
from draughts.engines.engine import Engine
from draughts.models import Color, Figure
from draughts.move import Move
# Hub protocol variant names mapping from py-draughts board classes
VARIANT_MAP = {
"Standard (international) checkers": "normal",
"Frisian draughts": "frisian",
# American checkers is 8x8, not supported by Scan (which is 10x10 only)
}
@dataclass
class EngineInfo:
"""Engine identification info from Hub protocol."""
name: str = ""
version: str = ""
author: str = ""
country: str = ""
@dataclass
class EngineParam:
"""Engine parameter declaration from Hub protocol."""
name: str
value: str
param_type: str # "bool", "int", "real", "string", "enum"
min_val: Optional[str] = None
max_val: Optional[str] = None
values: Optional[list[str]] = None # For enum type
@dataclass
class SearchInfo:
"""Search information from 'info' lines during search."""
depth: Optional[int] = None
mean_depth: Optional[float] = None
score: Optional[float] = None
nodes: Optional[int] = None
time: Optional[float] = None
nps: Optional[float] = None
pv: Optional[str] = None
@dataclass
class SearchResult:
"""Result from engine search ('done' line)."""
move: str
ponder: Optional[str] = None
info: SearchInfo = field(default_factory=SearchInfo)
def board_to_hub_position(board: BaseBoard) -> str:
"""
Convert a py-draughts board to Hub protocol position string.
Hub format: 51 characters total
- 1 char for side to move: 'W' or 'B'
- 50 chars for squares in standard order: 'w', 'b', 'W', 'B', 'e'
Args:
board: The board to convert (must be 10x10 = 50 squares)
Returns:
Hub position string like "Wbbbbbbbbbbbbbbbbbbbbeeeeeeeeeewwwwwwwwwwwwwwwwwwww"
Raises:
ValueError: If board is not 50 squares (10x10)
"""
if len(board.position) != 50:
raise ValueError(
f"Hub protocol only supports 10x10 boards (50 squares), "
f"got {len(board.position)} squares"
)
# Side to move
side = "W" if board.turn == Color.WHITE else "B"
# Square mapping: py-draughts Figure values -> Hub chars
# py-draughts: -1=white man, -2=white king, 1=black man, 2=black king, 0=empty
piece_map = {
Figure.WHITE_MAN.value: "w", # -1
Figure.WHITE_KING.value: "W", # -2
Figure.BLACK_MAN.value: "b", # 1
Figure.BLACK_KING.value: "B", # 2
Figure.EMPTY.value: "e", # 0
}
squares = "".join(piece_map[int(sq)] for sq in board.position)
return side + squares
def hub_position_to_board(
position: str, board_class: type[BaseBoard]
) -> BaseBoard:
"""
Parse a Hub protocol position string into a py-draughts board.
Args:
position: Hub position string (51 chars)
board_class: The Board class to instantiate
Returns:
Board instance with the parsed position
Raises:
ValueError: If position string is invalid
"""
import numpy as np
if len(position) != 51:
raise ValueError(f"Hub position must be 51 chars, got {len(position)}")
side_char = position[0].upper()
if side_char not in ("W", "B"):
raise ValueError(f"Invalid side to move: {side_char}")
turn = Color.WHITE if side_char == "W" else Color.BLACK
# Parse squares
piece_map = {
"w": Figure.WHITE_MAN.value,
"W": Figure.WHITE_KING.value,
"b": Figure.BLACK_MAN.value,
"B": Figure.BLACK_KING.value,
"e": Figure.EMPTY.value,
}
squares = position[1:]
pos_array = np.array([piece_map[c] for c in squares], dtype=np.int8)
return board_class(starting_position=pos_array, turn=turn)
def move_to_hub_notation(move: Move) -> str:
"""
Convert a py-draughts Move to Hub protocol notation.
Hub format:
- Quiet moves: "32-28"
- Captures: "28x19x23" (from x to x captured x captured...)
Note: Hub uses 1-indexed squares.
Args:
move: The Move object to convert
Returns:
Hub move string
"""
if not move.captured_list:
# Simple move: from-to
return f"{move.square_list[0] + 1}-{move.square_list[-1] + 1}"
else:
# Capture: from x to x captured1 x captured2 ...
# Format: start x end x cap1 x cap2 ...
parts = [str(move.square_list[0] + 1), str(move.square_list[-1] + 1)]
parts.extend(str(sq + 1) for sq in move.captured_list)
return "x".join(parts)
def parse_hub_move(
move_str: str,
legal_moves: Iterable[Move],
board: Optional[BaseBoard] = None,
) -> Move:
"""
Parse a Hub protocol move string and match against legal moves.
Args:
move_str: Hub move string like "32-28" or "28x19x23"
legal_moves: Generator of legal moves to match against
board: Optional board for diagnostic info on error
Returns:
Matching Move object from legal_moves
Raises:
ValueError: If move doesn't match any legal move
"""
# Parse the move string
if "-" in move_str:
parts = move_str.split("-")
is_capture = False
elif "x" in move_str:
parts = move_str.split("x")
is_capture = True
else:
raise ValueError(f"Invalid Hub move format: {move_str}")
# Convert to 0-indexed
squares = [int(p) - 1 for p in parts]
start_sq = squares[0]
end_sq = squares[1] # Second element is always destination in Hub format
captured_squares = set(squares[2:]) if is_capture and len(squares) > 2 else None
# Collect legal moves for potential error reporting
legal_moves_list = list(legal_moves)
# Find matching legal move
for legal_move in legal_moves_list:
if legal_move.square_list[0] != start_sq:
continue
if legal_move.square_list[-1] != end_sq:
continue
# For captures, also verify captured squares match
if is_capture:
if not legal_move.captured_list:
continue
if captured_squares and set(legal_move.captured_list) != captured_squares:
continue
return legal_move
# Build detailed error message
legal_moves_str = ", ".join(str(m) for m in legal_moves_list[:20])
if len(legal_moves_list) > 20:
legal_moves_str += f"... ({len(legal_moves_list)} total)"
error_msg = f"No legal move matches Hub move: {move_str}\n"
error_msg += f" Parsed as: start={start_sq + 1}, end={end_sq + 1}"
if captured_squares:
error_msg += f", captures={[s + 1 for s in captured_squares]}"
error_msg += f"\n Legal moves: [{legal_moves_str}]"
if board is not None:
error_msg += f"\n Position FEN: {board.fen}"
error_msg += f"\n Hub position: {board_to_hub_position(board)}"
error_msg += f"\n Turn: {'White' if board.turn == Color.WHITE else 'Black'}"
raise ValueError(error_msg)
def parse_hub_line(line: str) -> tuple[str, dict[str, str]]:
"""
Parse a Hub protocol line into command and arguments.
Hub syntax: <command> <arg>=<val> <arg>=<val> ...
Values can be quoted with double quotes.
Args:
line: Raw line from engine
Returns:
Tuple of (command, {arg: value, ...})
"""
line = line.strip()
if not line:
return "", {}
# Split into tokens, respecting quoted values
tokens = []
current = ""
in_quotes = False
for char in line:
if char == '"':
in_quotes = not in_quotes
elif char == " " and not in_quotes:
if current:
tokens.append(current)
current = ""
else:
current += char
if current:
tokens.append(current)
if not tokens:
return "", {}
command = tokens[0]
args = {}
for token in tokens[1:]:
if "=" in token:
key, _, value = token.partition("=")
args[key] = value
else:
# Flag argument (no value)
args[token] = ""
return command, args
[docs]
class HubEngine(Engine):
"""
Engine wrapper for Hub protocol (used by Scan and similar engines).
This class manages subprocess communication with an external draughts
engine using the Hub protocol (version 2).
Attributes:
path: Path to the engine executable
time_limit: Default time per move in seconds
depth_limit: Maximum search depth (None for no limit)
Example:
>>> engine = HubEngine("scan.exe", time_limit=1.0)
>>> engine.start()
>>> move = engine.get_best_move(board)
>>> engine.quit()
# Or with context manager:
>>> with HubEngine("scan.exe") as engine:
... move = engine.get_best_move(board)
"""
[docs]
def __init__(
self,
path: str | Path,
time_limit: float = 1.0,
depth_limit: Optional[int] = None,
init_timeout: float = 10.0,
):
"""
Initialize Hub engine wrapper.
Args:
path: Path to the engine executable
time_limit: Time limit per move in seconds (default 1.0)
depth_limit: Maximum search depth (None for no limit)
init_timeout: Timeout for engine initialization in seconds
"""
self.path = Path(path)
self.time_limit = time_limit
self.depth_limit = depth_limit
self.init_timeout = init_timeout
self.process: Optional[subprocess.Popen] = None
self.info: EngineInfo = EngineInfo()
self.params: dict[str, EngineParam] = {}
self._variant: Optional[str] = None
self._started = False
def __enter__(self) -> "HubEngine":
"""Context manager entry - starts the engine."""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit - quits the engine."""
self.quit()
[docs]
def start(self) -> None:
"""
Start the engine subprocess and complete initialization handshake.
The Hub protocol initialization:
1. GUI sends "hub"
2. Engine responds with id, param declarations, then "wait"
3. GUI optionally sends set-param commands
4. GUI sends "init"
5. Engine responds with "ready"
Raises:
FileNotFoundError: If engine executable not found
TimeoutError: If engine doesn't respond in time
RuntimeError: If initialization fails
"""
if self._started:
logger.warning("Engine already started")
return
if not self.path.exists():
raise FileNotFoundError(f"Engine not found: {self.path}")
logger.info(f"Starting Hub engine: {self.path}")
# Run from the engine's directory so it can find its data files
engine_dir = self.path.parent.resolve()
self.process = subprocess.Popen(
[str(self.path.resolve()), "hub"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
cwd=str(engine_dir), # Run from engine's directory
)
# Send hub command to start initialization
self._send("hub")
# Read engine responses until "wait"
start_time = time.time()
while True:
if time.time() - start_time > self.init_timeout:
self.quit()
raise TimeoutError("Engine initialization timed out")
line = self._read_line(timeout=1.0)
if line is None:
continue
cmd, args = parse_hub_line(line)
if cmd == "id":
self.info.name = args.get("name", "")
self.info.version = args.get("version", "")
self.info.author = args.get("author", "")
self.info.country = args.get("country", "")
logger.info(f"Engine: {self.info.name} {self.info.version}")
elif cmd == "param":
param = EngineParam(
name=args.get("name", ""),
value=args.get("value", ""),
param_type=args.get("type", "string"),
min_val=args.get("min"),
max_val=args.get("max"),
)
if "values" in args:
param.values = args["values"].split()
self.params[param.name] = param
logger.debug(f"Engine param: {param.name}={param.value}")
elif cmd == "wait":
break
# Send init command
self._send("init")
# Wait for ready
while True:
if time.time() - start_time > self.init_timeout:
self.quit()
raise TimeoutError("Engine initialization timed out waiting for ready")
line = self._read_line(timeout=1.0)
if line is None:
continue
cmd, _ = parse_hub_line(line)
if cmd == "ready":
break
self._started = True
logger.info("Engine ready")
[docs]
def quit(self) -> None:
"""Send quit command and terminate the engine subprocess."""
if self.process is None:
return
try:
self._send("quit")
self.process.wait(timeout=2.0)
except Exception:
self.process.kill()
finally:
self.process = None
self._started = False
logger.info("Engine terminated")
def set_variant(self, variant: str) -> None:
"""
Set the engine variant.
Args:
variant: Hub variant name ("normal", "frisian", "killer", "bt", "losing")
Note: Must be called before start() to take effect.
"""
self._variant = variant
def new_game(self) -> None:
"""
Signal the start of a new game (clears transposition table).
"""
self._send("new-game")
[docs]
def get_best_move(
self, board: BaseBoard, with_evaluation: bool = False
) -> Move | tuple[Move, float]:
"""
Get the best move for the given board position.
Implements the Engine interface. Sends the position to the engine,
starts a search, and returns the best move.
Args:
board: The current board state
with_evaluation: If True, return (move, score) tuple
Returns:
Move object, or (Move, score) if with_evaluation=True
Raises:
RuntimeError: If engine not started or search fails
ValueError: If board variant not supported
"""
if not self._started or self.process is None:
raise RuntimeError("Engine not started. Call start() first.")
# Verify board is 10x10
if len(board.position) != 50:
raise ValueError(
f"Hub protocol only supports 10x10 boards, got {len(board.position)} squares"
)
# Convert position to Hub format
hub_pos = board_to_hub_position(board)
logger.debug(f"Sending position: {hub_pos}")
logger.debug(f"Board FEN: {board.fen}")
logger.debug(f"Turn: {'White' if board.turn == Color.WHITE else 'Black'}")
# Build position command
# Note: We currently don't send move history for repetition detection
# because it requires sending moves that lead FROM a starting position
# TO the current position, not the other way around.
# TODO: Implement proper move history for repetition detection
self._send(f"pos pos={hub_pos}")
# Set search limits
if self.depth_limit is not None:
self._send(f"level depth={self.depth_limit}")
else:
self._send(f"level move-time={self.time_limit}")
# Start search
self._send("go think")
# Read search output until done
result = self._read_search_result()
# Parse the move and match against legal moves
move = parse_hub_move(result.move, board.legal_moves, board=board)
if with_evaluation:
score = result.info.score if result.info.score is not None else 0.0
return move, score
return move
def ping(self) -> bool:
"""
Send ping and wait for pong response.
Returns:
True if engine responded, False on timeout
"""
self._send("ping")
line = self._read_line(timeout=5.0)
if line:
cmd, _ = parse_hub_line(line)
return cmd == "pong"
return False
def _send(self, command: str) -> None:
"""Send a command to the engine."""
if self.process is None or self.process.stdin is None:
raise RuntimeError("Engine process not running")
logger.debug(f">> {command}")
self.process.stdin.write(command + "\n")
self.process.stdin.flush()
def _read_line(self, timeout: float = 1.0) -> Optional[str]:
"""
Read a line from engine stdout with timeout.
Returns:
Line string (stripped) or None on timeout
"""
if self.process is None or self.process.stdout is None:
return None
# Simple blocking read (for now, no async)
# In production, consider using select() or threading for proper timeout
import select
import sys
if sys.platform == "win32":
# Windows doesn't support select on pipes, use blocking read
# The engine should respond quickly enough
try:
line = self.process.stdout.readline()
if line:
line = line.strip()
logger.debug(f"<< {line}")
return line
except Exception:
pass
return None
else:
# Unix: use select for timeout
ready, _, _ = select.select([self.process.stdout], [], [], timeout)
if ready:
line = self.process.stdout.readline().strip()
logger.debug(f"<< {line}")
return line
return None
def _read_search_result(self) -> SearchResult:
"""
Read search output until 'done' line is received.
Returns:
SearchResult with move and search info
"""
result = SearchResult(move="", info=SearchInfo())
while True:
timeout_val = self.time_limit * 2 if self.time_limit else 30.0
line = self._read_line(timeout=max(timeout_val, 30.0))
if line is None:
continue
cmd, args = parse_hub_line(line)
if cmd == "info":
# Update search info
if "depth" in args:
result.info.depth = int(args["depth"])
if "mean-depth" in args:
result.info.mean_depth = float(args["mean-depth"])
if "score" in args:
result.info.score = float(args["score"])
if "nodes" in args:
result.info.nodes = int(args["nodes"])
if "time" in args:
result.info.time = float(args["time"])
if "nps" in args:
result.info.nps = float(args["nps"])
if "pv" in args:
result.info.pv = args["pv"]
elif cmd == "done":
result.move = args.get("move", "")
result.ponder = args.get("ponder")
if not result.move:
raise RuntimeError("Engine returned 'done' without move")
break
elif cmd == "error":
msg = args.get("message", "Unknown error")
raise RuntimeError(f"Engine error: {msg}")
return result
def _get_king_move_history(self, board: BaseBoard) -> list[Move]:
"""
Get king moves from move history for repetition detection.
Hub protocol only needs king moves (reversible moves) for
detecting draw by repetition.
Returns:
List of king moves from the move stack
"""
king_moves = []
for move in board._moves_stack:
# A move is a king move if:
# - No captures (non-reversible)
# - The piece at destination is a king (was a king before moving)
if not move.captured_list:
# Check if it was a king move by looking at current position
# (the king is now at the last square of the move)
end_sq = move.square_list[-1]
piece = board.position[end_sq]
if abs(piece) == Figure.KING.value:
king_moves.append(move)
return king_moves