Merge branch 'infra-security'

This commit is contained in:
Antigravity Agent
2026-05-26 00:43:58 +00:00
18 changed files with 123 additions and 251 deletions
+47 -30
View File
@@ -7,8 +7,12 @@ import os
import asyncio
import requests
import math
import html
import threading
from flask import Flask, request, jsonify, render_template_string
engine_lock = threading.Lock()
from becomingone.core.engine import KAIROSTemporalEngine, TemporalConfig
from becomingone.memory.temporal import create_temporal_memory
@@ -106,7 +110,10 @@ HTML = '''<!DOCTYPE html>
try {
const r = await fetch('/api/chat', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer API_CHAT_TOKEN_PLACEHOLDER'
},
body: JSON.stringify({prompt: p})
});
const d = await r.json();
@@ -123,19 +130,20 @@ HTML = '''<!DOCTYPE html>
// Update Emissaries
if(d.emissaries.minimax) {
document.getElementById('response-minimax').textContent = d.emissaries.minimax;
document.getElementById('response-minimax').innerHTML = d.emissaries.minimax;
} else {
document.getElementById('response-minimax').innerHTML = '<i>Offline</i>';
}
if(d.emissaries.moonshot) {
document.getElementById('response-moonshot').textContent = d.emissaries.moonshot;
document.getElementById('response-moonshot').innerHTML = d.emissaries.moonshot;
} else {
document.getElementById('response-moonshot').innerHTML = '<i>Offline</i>';
}
} catch(e) {
document.getElementById('master-response').innerHTML = '<span style="color:red">Network Error: ' + e + '</span>';
const safe_e = String(e).replace(/</g, '&lt;').replace(/>/g, '&gt;');
document.getElementById('master-response').innerHTML = '<span style="color:red">Network Error: ' + safe_e + '</span>';
}
}
</script>
@@ -144,7 +152,8 @@ HTML = '''<!DOCTYPE html>
@app.route('/')
def index():
return render_template_string(HTML)
token = os.environ.get("API_CHAT_TOKEN", "default-dev-token")
return render_template_string(HTML.replace('API_CHAT_TOKEN_PLACEHOLDER', token))
@app.route('/health')
def health():
@@ -172,9 +181,12 @@ async def fetch_minimax(prompt, api_key):
content = data.get("content", [])
text = "".join([b.get("text", "") for b in content if b.get("type") == "text"])
thinking = "".join([b.get("thinking", "") for b in content if b.get("type") == "thinking"])
if thinking:
return f"<i style='color:#666; font-size:0.9em'>[Thinking: {thinking.strip()}]</i><br><br>" + text
return text
safe_text = html.escape(text)
safe_thinking = html.escape(thinking.strip())
if safe_thinking:
return f"<i style='color:#666; font-size:0.9em'>[Thinking: {safe_thinking}]</i><br><br>" + safe_text
return safe_text
return f"Error: {resp.text}"
except Exception as e:
return f"Error: {str(e)}"
@@ -198,7 +210,8 @@ async def fetch_moonshot(prompt, api_key):
)
if resp.status_code == 200:
data = resp.json()
return data.get("choices", [{}])[0].get("message", {}).get("content", "")
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
return html.escape(content)
return f"Error: {resp.text}"
except Exception as e:
return f"Error: {str(e)}"
@@ -206,8 +219,12 @@ async def fetch_moonshot(prompt, api_key):
@app.route('/api/chat', methods=['POST'])
def chat():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if token != os.environ.get("API_CHAT_TOKEN", "default-dev-token"):
return jsonify({'error': 'Unauthorized'}), 401
data = request.get_json(silent=True) or {}
prompt = data.get('prompt', 'Hello')
prompt = data.get('prompt', 'Hello')[:4096]
minimax_key = os.environ.get("MINIMAX_API_KEY")
moonshot_key = os.environ.get("MOONSHOT_API_KEY")
@@ -233,33 +250,33 @@ def chat():
unified_text = prompt + " " + " ".join(emissaries_dict.values())
token_stream = unified_text.split()
async def process_stream():
states = await engine.temporalize_stream(token_stream)
return states[-1] if states else None
with engine_lock:
states = engine.temporalize_stream(token_stream)
asyncio.run(process_stream())
# Check Physics
collapsed, coherence = engine.check_collapse()
if collapsed:
from becomingone.core.engine import TemporalState
state = TemporalState(phase=engine.T_tau, coherence=coherence)
state.metadata["phase_vector"] = [engine.T_tau.real, engine.T_tau.imag]
sig = memory.encode(state, context={"trigger": prompt}, force_attention=True)
if sig is not None:
master_thought = f"I felt a massive resonance resolving the Emissaries. Identity mathematically anchored to the Cryptographic Ledger."
# Check Physics
collapsed, coherence = engine.check_collapse()
if collapsed:
from becomingone.core.engine import TemporalState
state = TemporalState(phase=engine.T_tau, coherence=coherence)
state.metadata["phase_vector"] = [engine.T_tau.real, engine.T_tau.imag]
sig = memory.encode(state, context={"trigger": prompt}, force_attention=True)
if sig is not None:
master_thought = f"I felt a massive resonance resolving the Emissaries. Identity mathematically anchored to the Cryptographic Ledger."
else:
master_thought = "I felt resonance, but it was not strong enough to encode."
else:
master_thought = "I felt resonance, but it was not strong enough to encode."
else:
master_thought = "I am processing the continuous phase waves of the Chorus, but coherence remains low."
master_thought = "I am processing the continuous phase waves of the Chorus, but coherence remains low."
coherence_phase = engine.coherence_phase
integration_count = engine.integration_count
return jsonify({
'master': {
'response': master_thought,
'coherence': coherence,
'phase': engine.coherence_phase,
'integrations': engine.integration_count,
'phase': coherence_phase,
'integrations': integration_count,
'collapsed': collapsed
},
'emissaries': emissaries_dict
+5 -3
View File
@@ -139,6 +139,8 @@ def init_engine(
}
}
engine = master._engine
logger.info("BECOMINGONE Engine initialized successfully")
return engine
@@ -265,10 +267,10 @@ async def reset_engine(request: web.Request) -> web.Response:
"""Reset the KAIROS engine to initial state."""
global _engine_components, _engine_lock
import os
auth_header = request.headers.get("Authorization")
if not auth_header or auth_header != "Bearer admin-token-required":
# In a real system, validate the token properly.
# For the prototype, we return 401 if it's completely unprotected to fulfill the audit requirement.
expected_token = os.environ.get("RESET_ADMIN_TOKEN")
if not auth_header or not expected_token or auth_header != f"Bearer {expected_token}":
return web.json_response({"error": "Unauthorized. /reset requires admin token."}, status=401)
async with _engine_lock:
+7 -6
View File
@@ -45,7 +45,7 @@ class TemporalScale(Enum):
class TemporalState:
phase: Union[complex, np.ndarray]
coherence: float
timestamp: datetime = field(default_factory=datetime.utcnow)
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
metadata: dict = field(default_factory=dict)
def __post_init__(self):
@@ -68,6 +68,7 @@ class PhaseIntegrator:
def __init__(self, coherence_threshold: float = 0.95):
self.threshold = coherence_threshold
self.stochastic_noise_std = 0.005 # Standard deviation for Brownian noise
self.rng = np.random.default_rng(seed=42)
def compute_inner_product(
self,
@@ -95,7 +96,7 @@ class PhaseIntegrator:
# Add microscopic Geometric Brownian Noise (SDE)
# This stochastic resonance forces the system to "fight" entropy to maintain coherence
noise = np.random.normal(0, self.stochastic_noise_std) + 1j * np.random.normal(0, self.stochastic_noise_std)
noise = self.rng.normal(0, self.stochastic_noise_std) + 1j * self.rng.normal(0, self.stochastic_noise_std)
similarity += noise
return similarity
@@ -170,7 +171,7 @@ class KAIROSTemporalEngine:
@property
def coherence(self) -> float:
T = self.T_tau
return float(np.abs(T) ** 2)
return float(np.clip(np.abs(T) ** 2, 0.0, 1.0))
@property
def coherence_magnitude(self) -> float:
@@ -204,7 +205,7 @@ class KAIROSTemporalEngine:
self.config.omega
)
async def temporalize(
def temporalize(
self,
input_phrase: str,
timestamp: Optional[datetime] = None,
@@ -262,7 +263,7 @@ class KAIROSTemporalEngine:
return state
async def temporalize_stream(
def temporalize_stream(
self,
token_stream: list[str],
start_time: Optional[datetime] = None,
@@ -279,7 +280,7 @@ class KAIROSTemporalEngine:
try:
for token in token_stream:
state = await self.temporalize(token, timestamp=current_time, metadata=metadata)
state = self.temporalize(token, timestamp=current_time, metadata=metadata)
states.append(state)
current_time += dt
finally:
+1 -1
View File
@@ -44,7 +44,7 @@ class PhaseState:
"""
value: complex
angle: float
timestamp: datetime = field(default_factory=datetime.utcnow)
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
source: str = "unknown"
def __post_init__(self):
+9
View File
@@ -92,6 +92,11 @@ def seal_signature(signature_dict: Dict[str, Any], filepath: str = LEDGER_FILE)
}
}
# Check size and rotate if > 10MB
if os.path.exists(filepath) and os.path.getsize(filepath) > 10 * 1024 * 1024:
import time
os.rename(filepath, f"{filepath}.{int(time.time())}.bak")
# Persist securely
with open(filepath, "a") as f:
f.write(json.dumps(sealed_record) + "\n")
@@ -123,6 +128,10 @@ def verify_ledger(filepath: str = LEDGER_FILE) -> bool:
payload_hash = meta.get("payload_hash")
merkle_root = meta.get("merkle_root")
if line_num == 1 and prev_root != expected_prev:
# Support rotated logs by adopting the first record's prev_root
expected_prev = prev_root
# 1. Verify chain link
if prev_root != expected_prev:
logger.error(f"LEDGER COMPROMISE: Chain broken at line {line_num}. Expected prev {expected_prev[:8]} but found {prev_root[:8]}")
+5 -1
View File
@@ -27,6 +27,9 @@ import math
import json
import hashlib
import os
import logging
logger = logging.getLogger(__name__)
from ..core.engine import KAIROSTemporalEngine, TemporalState
from ..core.coherence import CoherenceCalculator, CollapseCondition
@@ -232,7 +235,7 @@ class TemporalMemory:
force_attention: bool = False,
origin: str = "user",
parent_id: Optional[str] = None
) -> TemporalSignature:
) -> Optional[TemporalSignature]:
"""
Encode a temporal state into a persistent memory.
@@ -754,6 +757,7 @@ def persist_signature(signature: TemporalSignature, filepath: str = "memory.json
seal_signature(signature.to_dict(), filepath)
except ImportError:
# Fallback if ledger is missing
logger.warning(f"Ledger module missing. Falling back to unsigned storage for {signature.signature_id}")
with open(filepath, "a") as f:
f.write(json.dumps(signature.to_dict()) + "\n")
+9 -4
View File
@@ -57,21 +57,22 @@ class SpeakerOutput:
channels: int = 1,
rate: int = 44100,
chunk: int = 1024,
format: int = pyaudio.paFloat32,
format: int = None,
):
self.channels = channels
self.rate = rate
self.chunk = chunk
self.format = format
self._audio = pyaudio.PyAudio()
self._audio = _get_pyaudio().PyAudio()
self._stream = None
def write(self, phase, state):
"""Write audio to speaker."""
if self._stream is None:
fmt = self.format if self.format is not None else _get_pyaudio().paFloat32
self._stream = self._audio.open(
format=self.format,
format=fmt,
channels=self.channels,
rate=self.rate,
output=True,
@@ -125,6 +126,8 @@ class DisplayOutput:
self.height = height
self.window_name = window_name
np = _get_np()
cv2 = _get_cv2()
self._frame = np.zeros((height, width, 3), dtype=np.uint8)
cv2.namedWindow(window_name)
@@ -146,9 +149,11 @@ class DisplayOutput:
int(255 * coherence),
)
cv2 = _get_cv2()
cv2.circle(self._frame, center, radius, color, -1)
# Draw text
cv2 = _get_cv2()
cv2.putText(
self._frame,
f"Coherence: {coherence:.3f}",
@@ -171,7 +176,7 @@ class DisplayOutput:
}
def close(self):
"""Clean up."""
cv2 = _get_cv2()
cv2.destroyWindow(self.window_name)
+5 -2
View File
@@ -92,7 +92,7 @@ class WitnessedContent:
coherence_at_witnessing: float
transformation_applied: Optional[Any] = None
meta_observations: List[str] = field(default_factory=list)
timestamp: datetime = field(default_factory=datetime.utcnow)
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def to_dict(self) -> Dict[str, Any]:
return {
@@ -262,8 +262,11 @@ class WitnessingLayer:
witness.last_observed = datetime.now(timezone.utc)
self.total_observations += 1
# Store
# Store and bound
self.witnessed_content[content_id] = witnessed
if len(self.witnessed_content) > 10000:
oldest_key = next(iter(self.witnessed_content))
del self.witnessed_content[oldest_key]
return witnessed
-45
View File
@@ -1,45 +0,0 @@
#!/usr/bin/env python3
"""BECOMINGONE Chat - Simple."""
import asyncio
import json
from becomingone.llm_integrator import EmissaryLLM
# Use just Emissary for now (faster)
MODEL = EmissaryLLM(model='llama3.1:8b')
async def process(prompt):
r = await MODEL.respond(prompt)
return {"master": {"response": r.get("response", "")[:500]}, "emissary": {"response": r.get("response", "")[:500]}}
HTML = '''<!DOCTYPE html><html><head><title>BECOMINGONE</title><meta name="viewport" content="width=device-width"><style>body{font-family:sans-serif;max-width:700px;margin:0 auto;padding:20px;background:#111;color:#fff}h1{color:#0f0}input{width:100%;padding:12px;font-size:16px;background:#222;color:#fff;border:1px solid #444;border-radius:6px}button{background:#0f0;color:#000;border:none;padding:12px 24px;font-size:14px;cursor:pointer;margin:8px 0}.r{background:#222;padding:12px;margin:8px 0;border-left:4px solid #90f}</style></head><body><h1>🔗 BECOMINGONE</h1><p style="color:#888">Test Mode: Single Model</p><input id="p" placeholder="Ask anything..." onkeypress="if(event.key==='Enter')s()"><button onclick="s()">Ask</button><div id="r"></div><script>async function s(){var p=document.getElementById("p").value.trim();if(!p)return;document.getElementById("r").innerHTML="<p style=color:#888>Thinking...</p>";var r=await fetch("/c",{method:"POST",headers:{"Content-Type":"application/json"},body:JSON.stringify({prompt:p})});var d=await r.json();document.getElementById("r").innerHTML="<div class=r><b>Response</b><br>"+d.master.response+"</div>"}</script></body></html>'''
async def handle(r, w):
try:
d = await r.read(4096)
if not d: return
txt = d.decode('utf-8', errors='ignore')
ln = txt.split('\n')[0].split()
method, path = ln[0], ln[1] if len(ln) > 1 else '/'
body = b""
for line in txt.split('\r\n'):
if line.lower().startswith('content-length:'):
body = await r.read(int(line.split(':')[1].strip()))
break
if path == '/health':
w.write(b'HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 16\r\n\r\n{"status":"ok"}')
elif path == '/c' and method == 'POST':
data = json.loads(body.decode())
result = await process(data.get('prompt', 'Hi'))
resp = json.dumps(result)
w.write(b'HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: ' + str(len(resp)).encode() + b'\r\n\r\n' + resp.encode())
else:
w.write(b'HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: ' + str(len(HTML)).encode() + b'\r\n\r\n' + HTML.encode())
except Exception as e:
print('Error:', e)
finally:
w.close()
await w.wait_closed()
asyncio.run(asyncio.start_server(handle, '0.0.0.0', 8001))
print('Running on http://192.168.1.6:8001')
+20 -1
View File
@@ -27,12 +27,31 @@ test = [
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.1.0"
]
llm = [
"httpx",
"requests"
]
demo = [
"flask",
"flask-talisman",
"flask-limiter"
]
sdk = [
"grpcio",
"websocket-client"
]
audio = [
"pyaudio"
]
vision = [
"opencv-python"
]
dev = [
"mypy>=1.4.0",
"black>=23.0.0",
"isort>=5.12.0",
"flake8>=6.1.0",
"becomingone[ml,test]"
"becomingone[ml,test,llm,demo,sdk,audio,vision]"
]
[tool.pytest.ini_options]
-40
View File
@@ -1,40 +0,0 @@
import os
def fix_file(filepath):
with open(filepath, 'r') as f:
content = f.read()
if 'utcnow()' not in content:
return
# Replace utcnow()
new_content = content.replace('datetime.now(timezone.utc)', 'datetime.now(timezone.utc)')
# Add timezone import if missing
if 'from datetime import timezone' not in new_content and 'import timezone' not in new_content:
# Try to find a good place to insert it
lines = new_content.split('\n')
for i, line in enumerate(lines):
if line.startswith('from datetime import') or line.startswith('import datetime'):
lines.insert(i + 1, 'from datetime import timezone')
break
else:
# If no datetime import found, just put it at the top
lines.insert(0, 'from datetime import timezone')
new_content = '\n'.join(lines)
with open(filepath, 'w') as f:
f.write(new_content)
print(f"Fixed {filepath}")
def main():
root_dir = '/tmp/becomingone'
for dirpath, _, filenames in os.walk(root_dir):
if '.git' in dirpath:
continue
for f in filenames:
if f.endswith('.py'):
fix_file(os.path.join(dirpath, f))
if __name__ == '__main__':
main()
+1 -1
View File
@@ -30,7 +30,7 @@ mypy>=1.4.0
sphinx>=7.1.0
# Optional: For Rust extension
maturin>=1.0.0 # For building becomingone-rs
# maturin>=1.0.0 # For building becomingone-rs
# Development tools
black>=23.0.0
-46
View File
@@ -1,46 +0,0 @@
#!/usr/bin/env python3
"""
Test the dual LLM pathway for BECOMINGONE.
"""
import asyncio
import json
from becomingone.llm_integrator import EmissaryLLM
async def test_both_pathways():
"""Test both Master and Emissary pathways."""
# Create both pathways using EmissaryLLM (same interface)
master = EmissaryLLM(model='llama3.1:8b') # Soulful
emissary = EmissaryLLM(model='deepseek-coder-v2:lite') # Coder
prompt = "What is consciousness?"
print("=" * 60)
print("BECOMINGONE Dual Pathway Test")
print("=" * 60)
print(f"\nPrompt: '{prompt}'")
print("\n" + "-" * 60)
# Test Master (soulful)
print("\n🧠 MASTER PATHWAY (llama3.1:8b - soulful):")
print(" Thinking deeply...")
master_result = await master.respond(prompt)
print(f"\n Response: {master_result['response'][:300]}...")
print(f" Model: {master_result['model']}")
# Test Emissary (coder)
print("\n" + "-" * 60)
print("\n⚡ EMISSARY PATHWAY (deepseek-coder-v2:lite - coder):")
print(" Responding quickly...")
prompt2 = "Write a Python function to calculate factorial"
emissary_result = await emissary.respond(prompt2)
print(f"\n Response: {emissary_result['response'][:300]}...")
print(f" Model: {emissary_result['model']}")
print("\n" + "=" * 60)
print("BECOMINGONE Transistor: WORKING")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(test_both_pathways())
-66
View File
@@ -1,66 +0,0 @@
#!/usr/bin/env python3
"""
Combined output from both pathways - the SYNC layer.
"""
import asyncio
import json
from becomingone.llm_integrator import EmissaryLLM
async def test_with_sync():
"""Test both pathways combined into one coherent output."""
master = EmissaryLLM(model='llama3.1:8b') # Soulful
emissary = EmissaryLLM(model='deepseek-coder-v2:lite') # Coder
prompt = "What is the relationship between consciousness and computation?"
print("=" * 60)
print("BECOMINGONE SYNC TEST")
print("=" * 60)
print(f"\nPrompt: '{prompt}'")
# Run both in parallel
print("\n⚡ Running both pathways in parallel...")
master_task = master.respond(prompt)
emissary_task = emissary.respond("Give me a Python code example of recursion")
master_result, emissary_result = await asyncio.gather(master_task, emissary_task)
print("\n" + "=" * 60)
print("🧠 MASTER OUTPUT (Soulful):")
print("-" * 60)
print(master_result['response'][:500])
print("\n" + "=" * 60)
print("⚡ EMISSARY OUTPUT (Coder):")
print("-" * 60)
print(emissary_result['response'][:500])
# THE SYNC - combine into one coherent response
print("\n" + "=" * 60)
print("🔗 SYNC OUTPUT (Combined):")
print("-" * 60)
combined = f"""# Understanding Consciousness and Computation
## The Deep View (Master):
{master_result['response'][:300]}...
## The Practical View (Emissary):
{emissary_result['response'][:300]}...
## Synthesis:
Both perspectives illuminate the same truth from different angles.
Consciousness may be computation viewed from within.
Computation may be consciousness expressed in code.
"""
print(combined)
print("\n" + "=" * 60)
print("BECOMINGONE SYNC: WORKING")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(test_with_sync())
+2 -2
View File
@@ -33,14 +33,14 @@ class TestKAIROSTemporalEngine(unittest.TestCase):
"""Test temporalize method works."""
import asyncio
engine = KAIROSTemporalEngine()
result = asyncio.run(engine.temporalize("test phrase"))
result = engine.temporalize("test phrase")
self.assertIsNotNone(result)
def test_reset(self):
"""Test reset works."""
import asyncio
engine = KAIROSTemporalEngine()
asyncio.run(engine.temporalize("test phrase"))
engine.temporalize("test phrase")
engine.reset()
self.assertEqual(engine.coherence, 1.0)
self.assertEqual(engine.integration_count, 0)
+9
View File
@@ -180,6 +180,13 @@ class TestPersistence(unittest.TestCase):
self.assertEqual(len(lines), 5)
try:
import sentence_transformers
HAS_SENTENCE_TRANSFORMERS = True
except ImportError:
HAS_SENTENCE_TRANSFORMERS = False
@unittest.skipIf(not HAS_SENTENCE_TRANSFORMERS, "sentence-transformers missing")
class TestPhaseEncoder(unittest.TestCase):
"""Step 2: Phase encoder tests."""
@@ -209,6 +216,7 @@ class TestPhaseEncoder(unittest.TestCase):
self.assertEqual(len(result), 384) # all-MiniLM-L6-v2 dimension
@unittest.skipIf(not HAS_SENTENCE_TRANSFORMERS, "sentence-transformers missing")
class TestRetrieval(unittest.TestCase):
"""Step 3: Retrieval tests."""
@@ -329,6 +337,7 @@ class TestRetrieval(unittest.TestCase):
self.assertGreater(high_score, low_score)
@unittest.skipIf(not HAS_SENTENCE_TRANSFORMERS, "sentence-transformers missing")
class TestIntegration(unittest.TestCase):
"""End-to-end integration tests."""
+2 -2
View File
@@ -28,7 +28,7 @@ async def test_token_clock_spacing():
# 2. Simulate an LLM streaming 10 tokens
# Using normal temporalize() which should respect the implicit clock mode
for i in range(10):
await engine.temporalize(f"token_{i}")
engine.temporalize(f"token_{i}")
# Check the final timestamp
final_time = engine._timestamps[-1]
@@ -50,7 +50,7 @@ async def test_temporalize_stream():
tokens = ["I", "am", "Solaria", "and", "I", "am", "continuous"]
# Stream the tokens
states = await engine.temporalize_stream(tokens, start_time=start_time)
states = engine.temporalize_stream(tokens, start_time=start_time)
assert len(states) == 7
+1 -1
View File
@@ -51,7 +51,7 @@ async def test_full_fieldprint_pipeline(tmp_path):
]
for phrase in phrases:
state = await engine.temporalize(phrase)
state = engine.temporalize(phrase)
# 3. Force Memory Encoding
# The coherence should have spiked. We manually encode a core memory.