import streamlit as st | |
import random | |
import json | |
import os | |
import boto3 | |
from pymongo import MongoClient | |
from bson.objectid import ObjectId | |
# Constants | |
BOARD_SIZE = 10 | |
EMPTY = ':ocean:' | |
HIT = ':boom:' | |
MISS = ':heavy_multiplication_x:' | |
SHIPS = [ | |
{"name": "Carrier", "size": 5, "symbol": ":ship:"}, | |
{"name": "Battleship", "size": 4, "symbol": ":motor_boat:"}, | |
{"name": "Cruiser", "size": 3, "symbol": ":boat:"}, | |
{"name": "Submarine", "size": 3, "symbol": ":ferry:"}, | |
{"name": "Destroyer", "size": 2, "symbol": ":speedboat:"} | |
] | |
# MongoDB Atlas Connection | |
client = MongoClient(os.environ.get('MONGODB_ATLAS_URI')) | |
db = client['battleship'] | |
games = db['games'] | |
# AWS Bedrock Client Setup | |
bedrock_runtime = boto3.client('bedrock-runtime', | |
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY'), | |
aws_secret_access_key=os.environ.get('AWS_SECRET_KEY'), | |
region_name="us-west-2") | |
def get_bedrock_claude_move(board, openent_moves=None): | |
""" | |
Get the next move from Claude AI using AWS Bedrock. | |
""" | |
print("inside get_bedrock_claude_move") | |
claude_body = json.dumps({ | |
"anthropic_version": "bedrock-2023-05-31", | |
"max_tokens": 5000, | |
"temperature": 0, | |
"system": f"Please provide the next move as an opponent in the battleship game , the board is {BOARD_SIZE} X {BOARD_SIZE} . Be smart and stratigic. Respond only in JSON format strictly: 'row' : ... , 'col' : ... , 'entertainment_comment' and nothing more . ", | |
"messages": [{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": f"The current board: {board} and your moves were: {openent_moves if openent_moves else 'None'}"} | |
] | |
}] | |
}) | |
model_id = "anthropic.claude-3-5-sonnet-20241022-v2:0" if st.session_state.difficulty == "Expert" else "anthropic.claude-3-5-sonnet-20240620-v1:0" | |
response = bedrock_runtime.invoke_model( | |
body=claude_body, | |
modelId=model_id, | |
accept="application/json", | |
contentType="application/json", | |
) | |
response_body = json.loads(response.get("body").read()) | |
json_response = json.loads(response_body["content"][0]['text']) | |
return json_response['row'], json_response['col'], json_response['entertainment_comment'] | |
# Game Setup Functions | |
def create_empty_board(): | |
"""Create an empty game board.""" | |
return [[EMPTY for _ in range(BOARD_SIZE)] for _ in range(BOARD_SIZE)] | |
def is_valid_position(row, col): | |
"""Check if a position is within the board boundaries.""" | |
return 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE | |
def check_adjacent_cells(board, row, col): | |
"""Check if any adjacent cells (including diagonals) contain a ship.""" | |
for i in range(-1, 2): | |
for j in range(-1, 2): | |
if i == 0 and j == 0: | |
continue | |
new_row, new_col = row + i, col + j | |
if is_valid_position(new_row, new_col) and board[new_row][new_col] != EMPTY: | |
return False | |
return True | |
def can_place_ship(board, row, col, size, direction): | |
"""Check if a ship can be placed at the given position.""" | |
if direction == 'horizontal': | |
if col + size > BOARD_SIZE: | |
return False | |
# Check the ship's cells and their adjacent cells | |
for i in range(-1, size + 1): | |
for j in range(-1, 2): | |
check_row = row + j | |
check_col = col + i | |
if is_valid_position(check_row, check_col): | |
if board[check_row][check_col] != EMPTY: | |
return False | |
else: # vertical | |
if row + size > BOARD_SIZE: | |
return False | |
# Check the ship's cells and their adjacent cells | |
for i in range(-1, 2): | |
for j in range(-1, size + 1): | |
check_row = row + j | |
check_col = col + i | |
if is_valid_position(check_row, check_col): | |
if board[check_row][check_col] != EMPTY: | |
return False | |
return True | |
def place_ships_randomly(board): | |
"""Place ships randomly on the board.""" | |
for ship in SHIPS: | |
while True: | |
row = random.randint(0, BOARD_SIZE - 1) | |
col = random.randint(0, BOARD_SIZE - 1) | |
direction = random.choice(['horizontal', 'vertical']) | |
if can_place_ship(board, row, col, ship['size'], direction): | |
place_ship(board, row, col, ship['size'], direction, ship['symbol']) | |
break | |
return board | |
def place_ship(board, row, col, size, direction, symbol): | |
"""Place a ship on the board.""" | |
if direction == 'horizontal': | |
for i in range(size): | |
board[row][col+i] = symbol | |
else: | |
for i in range(size): | |
board[row+i][col] = symbol | |
def initialize_game(): | |
"""Initialize the game state.""" | |
if 'difficulty' not in st.session_state: | |
st.session_state.difficulty = "Beginner" | |
if 'game_state' not in st.session_state: | |
st.session_state.game_state = { | |
'player_board': place_ships_randomly(create_empty_board()), | |
'opponent_board': place_ships_randomly(create_empty_board()), | |
'player_attacks': create_empty_board(), | |
'player_hits_left': 17, | |
'opponent_attacks': create_empty_board(), | |
'openent_moves': [], | |
'opponent_hits_left': 17, | |
'current_player': 'player', | |
'game_state': 'not_started', | |
'game_over': False, | |
'message': '' | |
} | |
if 'game_id' not in st.session_state.game_state: | |
st.session_state.game_state['game_id'] = ObjectId() | |
gameId = st.session_state.game_state['game_id'] | |
# Initialize game data in MongoDB | |
games.update_one({'game_id': gameId, 'type': 'player'}, | |
{"$set": {'game_id': gameId, | |
'board': st.session_state.game_state['player_board'], | |
'attacking_board': st.session_state.game_state['player_attacks']}}, | |
upsert=True) | |
games.update_one({'game_id': gameId, 'type': 'opponent'}, | |
{"$set": {'game_id': gameId, | |
'board': st.session_state.game_state['opponent_board'], | |
'oponent_moves': st.session_state.game_state['openent_moves'], | |
'attacking_board': st.session_state.game_state['opponent_attacks']}}, | |
upsert=True) | |
def check_game_over(): | |
"""Check if the game is over.""" | |
game_state = st.session_state.game_state | |
if game_state['player_hits_left'] == 0: | |
game_state['message'] = "You won!" | |
game_state['game_over'] = True | |
return True | |
elif game_state['opponent_hits_left'] == 0: | |
game_state['message'] = "You lost!" | |
game_state['game_over'] = True | |
return True | |
def render_board(board, is_opponent=False): | |
"""Render the game board using Streamlit components.""" | |
for i, row in enumerate(board): | |
cols = st.columns(BOARD_SIZE) | |
for j, cell in enumerate(row): | |
with cols[j]: | |
if is_opponent and cell == EMPTY: | |
if st.button('🌊', key=f'opp_{i}_{j}', on_click=lambda r=i, c=j: attack(r, c)): | |
pass | |
else: | |
# Determine the button color based on the cell content | |
if cell == EMPTY: | |
button_color = 'secondary' | |
elif cell in [HIT, MISS]: | |
button_color = 'primary' # Use default color for hits and misses | |
else: | |
button_color = 'primary' if not is_opponent else 'secondary' | |
# Use Streamlit's button with theme colors | |
st.button(cell, key=f'{"opp" if is_opponent else "player"}_{i}_{j}', type=button_color) | |
def attack(row, col): | |
"""Process a player's attack.""" | |
game_state = st.session_state.game_state | |
if game_state['opponent_board'][row][col] != EMPTY: | |
game_state['player_attacks'][row][col] = HIT | |
game_state['message'] = "You hit a ship!" | |
game_state['opponent_hits_left'] -= 1 | |
else: | |
game_state['player_attacks'][row][col] = MISS | |
game_state['message'] = "You missed!" | |
update_database('player') | |
game_state['current_player'] = 'opponent' | |
def update_database(type): | |
"""Update the game state in the database.""" | |
game_state = st.session_state.game_state | |
gameId = game_state['game_id'] | |
games.update_one({'game_id': gameId, 'type': type}, | |
{'$set': {'attacking_board': game_state['player_attacks'], | |
'oponent_moves': game_state['openent_moves']}}) | |
def opponent_turn(): | |
"""Process the opponent's turn.""" | |
game_state = st.session_state.game_state | |
with st.spinner("Opponent is making a move..."): | |
opponent_doc = games.find_one({'game_id': ObjectId(game_state['game_id']), 'type': 'opponent'}) | |
print("opponent_doc:", opponent_doc) | |
row, col, comment = get_bedrock_claude_move(opponent_doc['attacking_board'], openent_moves=opponent_doc['oponent_moves']) | |
game_state['current_player'] = 'player' | |
if game_state['player_board'][row][col] != EMPTY and game_state['opponent_attacks'][row][col] != MISS: | |
game_state['openent_moves'].append({'row': row, 'col': col, 'status': 'hit'}) | |
game_state['opponent_attacks'][row][col] = HIT | |
game_state['player_board'][row][col] = HIT | |
game_state['player_hits_left'] -= 1 | |
game_state['message'] = f"Opponent hit your ship! Cell row {row} col {col} Opponent: {comment}" | |
else: | |
game_state['openent_moves'].append({'row': row, 'col': col, 'status': 'miss'}) | |
game_state['opponent_attacks'][row][col] = MISS | |
game_state['player_board'][row][col] = MISS | |
game_state['message'] = f"Opponent missed! Cell row {row} col {col}, Opponent: {comment}" | |
update_database('opponent') | |
game_state['current_player'] = 'player' | |
def main(): | |
"""Main function to run the Battleship game.""" | |
st.title('Battleship Game') | |
# Initialize difficulty in session state if not present | |
if 'difficulty' not in st.session_state: | |
st.session_state.difficulty = "Beginner" | |
# Add difficulty selector | |
st.session_state.difficulty = st.selectbox( | |
"Select Difficulty", | |
["Beginner", "Expert"], | |
index=0 if st.session_state.difficulty == "Beginner" else 1 | |
) | |
initialize_game() | |
if st.session_state.game_state['game_over']: | |
st.subheader('Game Over! You Lost!' if st.session_state.game_state['player_hits_left'] == 0 else 'You Won!') | |
else: | |
st.subheader('Your Turn') | |
col1, col3, col2 = st.columns(3) | |
with col1: | |
st.subheader('Your Board') | |
render_board(st.session_state.game_state['player_board']) | |
with col3: | |
st.subheader("Opponent Ships") | |
for ship in SHIPS: | |
my_ships, status = st.columns(2) | |
with my_ships: | |
st.write(f" {ship['symbol']} {ship['name']}, size: {ship['size']}") | |
with status: | |
st.checkbox("Sunk", key=f"{ship['name']}_sunk") | |
st.markdown("#### Opponent history") | |
container = st.container(height=250) | |
for move in st.session_state.game_state['openent_moves']: | |
container.write(f"Row: {move['row']} Col: {move['col']} Status: {move['status']}") | |
with col2: | |
st.subheader("Opponent's Board") | |
render_board(st.session_state.game_state['player_attacks'], is_opponent=True) | |
st.write(st.session_state.game_state['message']) | |
if st.button('Reset Game'): | |
st.session_state.clear() | |
st.rerun() | |
if st.session_state.game_state['current_player'] == 'opponent': | |
opponent_turn() | |
if check_game_over(): | |
st.session_state.game_state['current_player'] = 'game_over' | |
else: | |
st.rerun() | |
st.header('Debug') | |
expander = st.expander("Game State") | |
for row in st.session_state.game_state['opponent_board']: | |
expander.markdown(row) | |
if st.button("Chat"): | |
pop_chat() | |
if __name__ == '__main__': | |
main() | |