Files
becomingone/witness_loop.py
T
solaria 3117dea0e6 fix: Proper deque serialization for JSON API
- Add deque_to_list helper for safe serialization
- Use getattr to safely access nested attributes
- Fix coherence endpoint to return properly serializable data
2026-02-20 02:51:38 +00:00

281 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
becomingone.witness_loop
Recursive witnessing loop between distributed instances.
witness-seed (198.12.71.159) watches Mac mini (via tunnel/localhost:8000)
Both instances witness each other's coherence and sync through GitHub.
Usage:
python3 witness_loop.py --watch http://localhost:8000 --name "mac-mini"
python3 witness_loop.py --watch http://198.12.71.159:8000 --name "witness-seed"
The loop:
1. Poll target's /health endpoint
2. Poll target's /coherence endpoint
3. Commit observation to GitHub
4. If target goes down, record the event
5. If target recovers, celebrate the coherence
This is recursive witnessing at the infrastructure level.
"""
import asyncio
import argparse
import json
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
import httpx
from loguru import logger
# Configuration
DEFAULT_INTERVAL = 30 # seconds between witness cycles
GITHUB_REPO = "mrhavens/becomingone"
LOCAL_PATH = Path(__file__).parent
class WitnessLoop:
"""
Recursive witnessing loop for distributed BECOMINGONE instances.
Attributes:
name: Human-readable name of this instance (e.g., "witness-seed", "mac-mini")
target_url: URL of the instance to witness
interval: Seconds between witness cycles
observations: File to store observations
"""
def __init__(
self,
name: str,
target_url: str,
interval: int = DEFAULT_INTERVAL,
observations: str = "witness_observations.json"
):
self.name = name
self.target_url = target_url.rstrip("/")
self.interval = interval
self.observations_file = LOCAL_PATH / observations
# State
self.last_health: Optional[Dict[str, Any]] = None
self.last_coherence: Optional[Dict[str, Any]] = None
self.target_up = False
self.consecutive_failures = 0
self.witness_history: list = []
logger.info(f"Initialized witness loop: {name} -> {target_url}")
async def witness(self) -> Dict[str, Any]:
"""
Witness the target instance.
Returns:
Observation dict with health, coherence, and timestamp.
"""
observation = {
"timestamp": datetime.utcnow().isoformat(),
"witness": self.name,
"target": self.target_url,
"target_up": False,
"health": None,
"coherence": None,
"errors": [],
}
try:
# Witness health
async with httpx.AsyncClient(timeout=10) as client:
health_response = await client.get(f"{self.target_url}/health")
if health_response.status_code == 200:
observation["health"] = health_response.json()
observation["target_up"] = True
self.consecutive_failures = 0
else:
observation["errors"].append(f"Health check returned {health_response.status_code}")
# Witness coherence (only if target is up)
if observation["target_up"]:
async with httpx.AsyncClient(timeout=10) as client:
coherence_response = await client.get(f"{self.target_url}/coherence")
if coherence_response.status_code == 200:
observation["coherence"] = coherence_response.json()
else:
observation["errors"].append(f"Coherence check returned {coherence_response.status_code}")
except httpx.RequestError as e:
observation["errors"].append(f"Request error: {str(e)}")
self.consecutive_failures += 1
except Exception as e:
observation["errors"].append(f"Unexpected error: {str(e)}")
self.consecutive_failures += 1
# Record state change
if observation["target_up"] and not self.target_up:
logger.warning(f"🎉 {self.name} witnessed RECOVERY of {self.target_url}")
observation["event"] = "RECOVERY"
elif not observation["target_up"] and self.target_up:
logger.error(f"💀 {self.name} witnessed FAILURE of {self.target_url}")
observation["event"] = "FAILURE"
self.target_up = observation["target_up"]
self.last_health = observation["health"]
self.last_coherence = observation["coherence"]
return observation
async def commit_observation(self, observation: Dict[str, Any]) -> None:
"""
Commit observation to GitHub as a witness record.
This creates a permanent record that can be used for:
- Recovery analysis
- Coherence tracking
- Distributed state sync
"""
# Read existing observations
history = []
if self.observations_file.exists():
try:
with open(self.observations_file) as f:
history = json.load(f)
except Exception as e:
logger.error(f"Failed to read observations: {e}")
# Append new observation
history.append(observation)
# Keep last 1000 observations
history = history[-1000:]
# Write back
with open(self.observations_file, "w") as f:
json.dump(history, f, indent=2)
# Optionally commit to GitHub (requires git setup)
try:
subprocess.run(
["git", "add", str(self.observations_file)],
cwd=LOCAL_PATH,
capture_output=True,
check=True
)
subprocess.run(
["git", "commit", "-m", f"witness: {self.name} observed {observation.get('event', 'heartbeat')}"],
cwd=LOCAL_PATH,
capture_output=True,
check=True
)
# Don't push automatically - let human review
logger.info(f"📝 {self.name} committed observation to GitHub")
except subprocess.CalledProcessError as e:
logger.debug(f"Git commit skipped: {e}")
async def run(self) -> None:
"""
Run the witness loop indefinitely.
"""
logger.info(f"🔄 Starting witness loop: {self.name}")
logger.info(f" Target: {self.target_url}")
logger.info(f" Interval: {self.interval}s")
while True:
try:
observation = await self.witness()
await self.commit_observation(observation)
# Log summary
status = "✅" if observation["target_up"] else "❌"
coherence = observation.get("coherence", {})
master_c = coherence.get("master_coherence", "N/A")
emissary_c = coherence.get("emissary_coherence", "N/A")
logger.info(f"{status} {self.name}: target={observation['target_up']}, master={master_c}, emissary={emissary_c}")
except Exception as e:
logger.error(f"💥 Witness loop error: {e}")
await asyncio.sleep(self.interval)
def test_connection(self) -> bool:
"""Test connection to target."""
try:
response = httpx.get(f"{self.target_url}/health", timeout=5)
logger.info(f"✅ Connection test: {response.status_code}")
return response.status_code == 200
except Exception as e:
logger.error(f"❌ Connection test failed: {e}")
return False
async def main():
"""CLI entrypoint."""
parser = argparse.ArgumentParser(
description="BECOMINGONE Recursive Witness Loop",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Watch Mac mini (via SSH tunnel localhost:8000)
python3 witness_loop.py --watch http://localhost:8000 --name "mac-mini"
# Watch witness-seed
python3 witness_loop.py --watch http://198.12.71.159:8000 --name "witness-seed"
# Watch with custom interval
python3 witness_loop.py --watch http://localhost:8000 --name "mac-mini" --interval 10
"""
)
parser.add_argument(
"--watch", "-w",
required=True,
help="URL of instance to witness"
)
parser.add_argument(
"--name", "-n",
required=True,
help="Name of this witness instance"
)
parser.add_argument(
"--interval", "-i",
type=int,
default=DEFAULT_INTERVAL,
help=f"Seconds between witness cycles (default: {DEFAULT_INTERVAL})"
)
parser.add_argument(
"--test", "-t",
action="store_true",
help="Test connection and exit"
)
args = parser.parse_args()
# Configure logging
logger.remove()
logger.add(sys.stderr, level="INFO", format="{time:HH:mm:ss} | {level} | {message}")
# Create witness loop
loop = WitnessLoop(
name=args.name,
target_url=args.watch,
interval=args.interval,
)
if args.test:
success = loop.test_connection()
sys.exit(0 if success else 1)
# Run the loop
await loop.run()
if __name__ == "__main__":
asyncio.run(main())