Now that we have a working in memory hardcoded table with insert and select. Let’s add a bunch of validations to make our code less error prone. The prepare function now split in two would look like so.
def prepare_insert(user_input):
parts=user_input.split()
if len(parts)!=4:
return "PREPARE_SYNTAX_ERROR",None
try:
id_val=int(parts[1])
except ValueError:
return "PREPARE_SYNTAX_ERROR", None
if id_val < 0:
return "PREPARE_NEGATIVE_ID", None
username=parts[2]
email=parts[3]
username_bytes = username.encode('utf-8')
email_bytes = email.encode('utf-8')
if len(username_bytes) > COLUMN_USERNAME_SIZE:
return "PREPARE_STRING_TOO_LONG", None
if len(email_bytes) > COLUMN_EMAIL_SIZE:
return "PREPARE_STRING_TOO_LONG", None
# We can pass the original strings forward since our serialize_row
# function handles the encoding, or pass the bytes to avoid double-encoding.
return "PREPARE_SUCCESS", {"type": "insert", "data": (id_val, username, email)}
def prepare_statement(user_input):
if user_input.startswith("insert"):
return prepare_insert(user_input)
if user_input == "select":
return "PREPARE_SUCCESS", {"type": "select"}
return "PREPARE_UNRECOGNIZED_STATEMENT", None
The main function will look like so
def main():
table=Table()
while True:
print_prompt()
user_input = read_input()
if user_input.startswith('.'):
result = do_meta_command(user_input)
if result == MetaCommandResult.SUCCESS:
continue
elif result == MetaCommandResult.UNRECOGNIZED_COMMAND:
print(f"Unrecognized command '{user_input}'")
continue
# 2. Prepare Statement
result_code,statement = prepare_statement(user_input)
if result_code == "PREPARE_SYNTAX_ERROR":
print("Syntax error. Could not parse statement.")
continue
elif result_code == "PREPARE_STRING_TOO_LONG":
print("String is too long.")
continue
elif result_code == "PREPARE_NEGATIVE_ID":
print("ID must be positive.")
continue
elif result_code == "PREPARE_UNRECOGNIZED_STATEMENT":
print(f"Unrecognized keyword at start of '{user_input}'.")
continue
if statement["type"] == "insert":
result = execute_insert(statement, table)
if result == "TABLE_FULL":
print("Error: Table full.")
else:
print("Executed.")
elif statement["type"] == "select":
execute_select(table)
print("Executed.")
Now let’s add unittests
Let us start by creating a function that accepts inputs from a list and mocks inputs by a human. The function starts by crrating a process that runs the command to run the file. By putting stdin as subprocess.PIPE we can enter inputs the PIPE also returns a result of array.
text=True ensures the data is handled as regular text (strings) rather than raw binary data (bytes).
import unittest
import subprocess
def run_script(commands):
"""
Spawns the database process, feeds it the commands,
and returns the output as a single string.
"""
process = subprocess.Popen(
["python3", "REPL.py"], # Make sure your main file is named db.py
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True
)
# Join commands with a newline and send them to the process
input_data = "\\n".join(commands) + "\\n"
stdout, stderr = process.communicate(input_data)
return stdout
The process.communicate(input_data)
input_data to your database's stdin..exit command).stdout) and any errors (stderr).Now we create tests,
**class** TestDatabase(unittest.TestCase):
We test for:
def test_inserts_and_retrieves_row(self):
result = run_script([
"insert 1 user1 person1@example.com",
"select",
".exit",
])
self.assertIn("Executed.", result)
self.assertIn("(1, user1, person1@example.com)", result)