We expect to get the CST records in the form of CSV files
New records should be ahead of the existing data in time i.e. they can't be randomly placed in the middle of the existing data.
There is only one customer and company per conversation thread
NBA engine runs only for those conversations where customer reply is last and is waiting for company.
Reason:
Data ingestion is done by taking the CST records in the form of CSV files, transforming and normalizing them to a single table. I’ve used JSON files for now. Each conversation starts with an initial message and may continue with follow-up replies. New data is checked—if it's already stored, we skip it; if it continues an existing conversation, we attach it to the end. The system runs regularly to catch new replies and keep the records current. It only passes the unprocessed records to the next step i.e. tagging
erDiagram
InteractionTable {
string primary_tweet_id PK
string primary_tweet
JSONB Chat_history
string customer_id
string company_id
string tail_id
boolean processed
}
Chat history payload format:
inbound
determines if its a company or customer tweetresponse_tweet_id
being null means that tweet is the last tweet of the threadin_response_to_tweet_id
being null means that tweet is the start of the thread"chat_history": [
{
"response_type": "Customer",
"response": {
"tweet_id": "8",
"text": "@sprintcare is the worst customer service",
"created_at": "Tue Oct 31 21:45:10 +0000 2017"
}
},
{
"response_type": "Company",
"response": {
"tweet_id": "6",
"text": "@115712 Can you please send us a private message, so that I can gain further details about your account?",
"created_at": "Tue Oct 31 21:46:24 +0000 2017"
}
}]
When new records are ingested, they are checked against existing data using the primary_tweet_id
:
primary_tweet_id
already exists, the record is considered a duplicate and is ignored.primary_tweet_id
matches the tail_id
of an existing thread, it indicates a continuation of that conversation and should be merged into the existing threadThis approach ensures idempotent re-runs (i.e., reprocessing the same data causes no duplication) and enables seamless thread extensions for ongoing conversations.
Postgres is the right way to store since the table has both relational fields (customer_id
, primary_tweet_id
) and a nested Chat_history
, (JSONB), also allows indexing.
I have not integrated a database into this project as it is currently a prototype. Instead, I used JSON files to store all data and intermediate results for a better understanding and readability.
We can run the data ingestion pipeline every few hours, since we need to know if customer has replied, so we can give a faster response. For scheduling we can use Airflow because it provides a reliable, scalable way to run your job on a schedule while offering features like automatic retries, logging. We could use a simple CRON Job too.