Quickstart¶
To setup a simple example, we will a stream of HackerNews comments on articles to create feature tables which store the average comment sentiment for each article on HackerNews.
Creating a ralf
server¶
To generate data, run the examples/stream_hn_comments.sh
script inside the examples folder.
First, we connect create a Source
comments = ralf.from_kafka_source(topic="hn_comments")
Although the comments
table is coming from a stream of data, we can treat it like a dataframe object that can be transformed into other tables. In order to calculate the sentiment of each comment, we can create an Operator
object which loads and runs a sentiment model.
from ralf import Operator
class CommentSentiment(Operator):
def __init__(self, schema):
super().__init__(schema)
self.model = pipeline("sentiment-analysis")
def on_record(self, record: Record):
return Record(
title=record.title,
comment_id=record.comment_id,
sentiment=self.model(record.text)
)
comments = ralf.from_kafka_source(topic="hn_comments")
sentiment = comments.map(CommentSentiment, args=(sentiment_schema))
Finally, to create a table tracking the average comment sentiment for each article, we can use built in `group_by
class GroupByAverage(Operator):
def __init__(self, key: str, value: str, schema):
self.state = defaultdict(list)
self.key = key
self.value = value
def on_record(self, record: Record):
key = record.get(self.key)
self.state[key].append(record.get(self.value))
return Record(key=key, value=np.array(self.state[key]).avg())
user_sentiment = sentiment.map(GroupByAverage, args=("user", "sentiment", user_sentiment_schema))
article_sentiment = sentiment.map(GroupByAverage, args=("title", "sentiment", title_sentiment_schema))
To make our tables externally queryable, we can define a handle for each table to be queryable by:
user_sentiment = sentiment
.map(GroupByAverage, args=("user", "sentiment", user_sentiment_schema))
.as_queryable("user_sentiment")
article_sentiment = sentiment
.map(GroupByAverage, args=("title", "sentiment", title_sentiment_schema))
.as_queryable("article_sentiment")
Finally, to run the server, we add:
ralf.run()
Creating a ralf
client¶
Ralf tables set as queryable with the .as_queryable()
method can be queried by the client. We can create a Ralf client in a seperate script with:
from ralf import RalfClient
client = RalfClient()
The client can be queried by the primary key (set in the table schema) with the point_query()
method:
user_sentiment = client.point_query(table="user_sentiment", key="bob")
You can also query all values in the table with a bulk_query()
, which returns an iterator returning batch_size
rows at a time:
for rows in client.bulk_query(table="user_sentiment", batch_size=100):
for user_sentiment in rows:
print(user_sentiment)
TODO
Change to HTTPSource