Consume tweets through Kafka and push to Elastic

[confluent@localhost ~]$ cat tweet_producer.py
"""
-----------------------------------------------------------------------------------------------------
Author:         Steve Howard
Date:           February 13, 2019
Purpose:        Simple Kafka producer which consumes tweets and enqueues in topic
                  Kafka Connect has a connector for Twitter, this is just a plumbing POC
-----------------------------------------------------------------------------------------------------
"""

from TwitterAPI import TwitterAPI
import sys, json, datetime
from json import dumps
from kafka import KafkaProducer

term = sys.argv[1]

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))

api = TwitterAPI("*****",
                 "*****",
                 "*****",
                 "*****")

r = api.request('statuses/filter', {'track': term})

try:
  for item in r:
    doc = {
      'author': item['user']['screen_name'],
      'id': item['id'],
      'text': item['text'],
      'timestamp': item['timestamp_ms']
    }
    print(str(datetime.datetime.now().timestamp()) + '\t' + str(doc))
    producer.send('tweets', value=doc)
    producer.flush()
except KeyboardInterrupt:
  sys.exit(0)


[confluent@localhost ~]$ cat tweet_consumer.py
"""
-----------------------------------------------------------------------------------------------------
Author:         Steve Howard
Date:           February 13, 2019
Purpose:        Simple Kafka consumer which progates tweets dequeued from topic to ElasticSearch
                  Kafka Connect has a connector for Elastic, this is just a plumbing POC
-----------------------------------------------------------------------------------------------------
"""
from datetime import datetime
from elasticsearch import Elasticsearch
from kafka import KafkaConsumer
from json import loads
import time, sys, signal, datetime

es = Elasticsearch("https://****:****@kafkapoc-7051606219.us-east-1.bonsaisearch.net")

consumer = KafkaConsumer(
    'tweets',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='latest',
     value_deserializer=lambda x: loads(x.decode('utf-8')))
     #enable_auto_commit=True,
     #group_id='my-group',

try:
  for message in consumer:
    res = es.index(index="tweets", doc_type='tweet', id=message.value['id'], body=message.value)
    print(str(datetime.datetime.now().timestamp()) + '\t' + str(message.value))
except KeyboardInterrupt:
    sys.exit(0)
[confluent@localhost ~]$

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.