[confluent@localhost ~]$ cat tweet_producer.py
"""
-----------------------------------------------------------------------------------------------------
Author: Steve Howard
Date: February 13, 2019
Purpose: Simple Kafka producer which consumes tweets and enqueues in topic
Kafka Connect has a connector for Twitter, this is just a plumbing POC
-----------------------------------------------------------------------------------------------------
"""
from TwitterAPI import TwitterAPI
import sys, json, datetime
from json import dumps
from kafka import KafkaProducer
term = sys.argv[1]
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
api = TwitterAPI("*****",
"*****",
"*****",
"*****")
r = api.request('statuses/filter', {'track': term})
try:
for item in r:
doc = {
'author': item['user']['screen_name'],
'id': item['id'],
'text': item['text'],
'timestamp': item['timestamp_ms']
}
print(str(datetime.datetime.now().timestamp()) + '\t' + str(doc))
producer.send('tweets', value=doc)
producer.flush()
except KeyboardInterrupt:
sys.exit(0)
[confluent@localhost ~]$ cat tweet_consumer.py
"""
-----------------------------------------------------------------------------------------------------
Author: Steve Howard
Date: February 13, 2019
Purpose: Simple Kafka consumer which progates tweets dequeued from topic to ElasticSearch
Kafka Connect has a connector for Elastic, this is just a plumbing POC
-----------------------------------------------------------------------------------------------------
"""
from datetime import datetime
from elasticsearch import Elasticsearch
from kafka import KafkaConsumer
from json import loads
import time, sys, signal, datetime
es = Elasticsearch("https://****:****@kafkapoc-7051606219.us-east-1.bonsaisearch.net")
consumer = KafkaConsumer(
'tweets',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
value_deserializer=lambda x: loads(x.decode('utf-8')))
#enable_auto_commit=True,
#group_id='my-group',
try:
for message in consumer:
res = es.index(index="tweets", doc_type='tweet', id=message.value['id'], body=message.value)
print(str(datetime.datetime.now().timestamp()) + '\t' + str(message.value))
except KeyboardInterrupt:
sys.exit(0)
[confluent@localhost ~]$