{"id":6901,"date":"2019-02-20T08:27:15","date_gmt":"2019-02-20T13:27:15","guid":{"rendered":"http:\/\/appcrawler.com\/wordpress\/?p=6901"},"modified":"2019-02-20T08:29:04","modified_gmt":"2019-02-20T13:29:04","slug":"consume-tweets-through-kafka-and-push-to-elastic","status":"publish","type":"post","link":"http:\/\/appcrawler.com\/wordpress\/2019\/02\/20\/consume-tweets-through-kafka-and-push-to-elastic\/","title":{"rendered":"Consume tweets through Kafka and push to Elastic"},"content":{"rendered":"\n<pre class=\"wp-block-code\"><code>[confluent@localhost ~]$ cat tweet_producer.py\n\"\"\"\n-----------------------------------------------------------------------------------------------------\nAuthor:         Steve Howard\nDate:           February 13, 2019\nPurpose:        Simple Kafka producer which consumes tweets and enqueues in topic\n                  Kafka Connect has a connector for Twitter, this is just a plumbing POC\n-----------------------------------------------------------------------------------------------------\n\"\"\"\n\nfrom TwitterAPI import TwitterAPI\nimport sys, json, datetime\nfrom json import dumps\nfrom kafka import KafkaProducer\n\nterm = sys.argv[1]\n\nproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],\n                         value_serializer=lambda x: dumps(x).encode('utf-8'))\n\napi = TwitterAPI(\"*****\",\n                 \"*****\",\n                 \"*****\",\n                 \"*****\")\n\nr = api.request('statuses\/filter', {'track': term})\n\ntry:\n  for item in r:\n    doc = {\n      'author': item['user']['screen_name'],\n      'id': item['id'],\n      'text': item['text'],\n      'timestamp': item['timestamp_ms']\n    }\n    print(str(datetime.datetime.now().timestamp()) + '\\t' + str(doc))\n    producer.send('tweets', value=doc)\n    producer.flush()\nexcept KeyboardInterrupt:\n  sys.exit(0)\n\n\n[confluent@localhost ~]$ cat tweet_consumer.py\n\"\"\"\n-----------------------------------------------------------------------------------------------------\nAuthor:         Steve Howard\nDate:           February 13, 2019\nPurpose:        Simple Kafka consumer which progates tweets dequeued from topic to ElasticSearch\n                  Kafka Connect has a connector for Elastic, this is just a plumbing POC\n-----------------------------------------------------------------------------------------------------\n\"\"\"\nfrom datetime import datetime\nfrom elasticsearch import Elasticsearch\nfrom kafka import KafkaConsumer\nfrom json import loads\nimport time, sys, signal, datetime\n\nes = Elasticsearch(\"https:\/\/****:****@kafkapoc-7051606219.us-east-1.bonsaisearch.net\")\n\nconsumer = KafkaConsumer(\n    'tweets',\n     bootstrap_servers=['localhost:9092'],\n     auto_offset_reset='latest',\n     value_deserializer=lambda x: loads(x.decode('utf-8')))\n     #enable_auto_commit=True,\n     #group_id='my-group',\n\ntry:\n  for message in consumer:\n    res = es.index(index=\"tweets\", doc_type='tweet', id=message.value['id'], body=message.value)\n    print(str(datetime.datetime.now().timestamp()) + '\\t' + str(message.value))\nexcept KeyboardInterrupt:\n    sys.exit(0)\n[confluent@localhost ~]$<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p class=\"more-link-p\"><a class=\"more-link\" href=\"http:\/\/appcrawler.com\/wordpress\/2019\/02\/20\/consume-tweets-through-kafka-and-push-to-elastic\/\">Read more &rarr;<\/a><\/p>\n","protected":false},"author":2,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_mi_skip_tracking":false,"footnotes":""},"categories":[1],"tags":[],"_links":{"self":[{"href":"http:\/\/appcrawler.com\/wordpress\/wp-json\/wp\/v2\/posts\/6901"}],"collection":[{"href":"http:\/\/appcrawler.com\/wordpress\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/appcrawler.com\/wordpress\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/appcrawler.com\/wordpress\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"http:\/\/appcrawler.com\/wordpress\/wp-json\/wp\/v2\/comments?post=6901"}],"version-history":[{"count":3,"href":"http:\/\/appcrawler.com\/wordpress\/wp-json\/wp\/v2\/posts\/6901\/revisions"}],"predecessor-version":[{"id":6904,"href":"http:\/\/appcrawler.com\/wordpress\/wp-json\/wp\/v2\/posts\/6901\/revisions\/6904"}],"wp:attachment":[{"href":"http:\/\/appcrawler.com\/wordpress\/wp-json\/wp\/v2\/media?parent=6901"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/appcrawler.com\/wordpress\/wp-json\/wp\/v2\/categories?post=6901"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/appcrawler.com\/wordpress\/wp-json\/wp\/v2\/tags?post=6901"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}