TheDeveloperBlog.com

Home | Contact Us

C-Sharp | Java | Python | Swift | GO | WPF | Ruby | Scala | F# | JavaScript | SQL | PHP | Angular | HTML

Creating Twitter Producer

Creating Twitter Producer with Apache Kafka Introduction, What is Kafka, Kafka Topic Replication, Kafka Fundamentals, Architecture, Kafka Installation, Tools, Kafka Application etc.

<< Back to KAFKA

Creating Twitter Producer

In this section, we will learn to create a twitter producer.

There are basically three steps to create a twitter producer:

  1. Create a twitter client.
  2. Create the Producer
  3. Send tweets

Step1: Create a new java package, following the package naming convention rules. Then, create a java class within it, say 'tweetproducer.java.'

Step2: Create a twitter client by creating a method for it. Now, copy the Quickstart code from the 'github twitter java' to the twitter client method, as shown below:

Creating Twitter Producer

Paste it in the newly created method. This code will create a connection between the client and the hbc host. The BlockingQueue will stop the client to dequeue or enqueue the messages when the queue is empty or already full. As we are using hbc-core, we only require the msgQueue. Also, we will follow the terms, not the people. Therefore, copy the highlighted code only.

Now, copy the 'Creating a client' code given below the connection code as:

Creating Twitter Producer

Paste the code below the connection code. This code will create a twitter client through the client builder. As we are using msgQueue, do not copy the red highlighted code, which is for the eventMessageQueue. It is not required.

Step3: Create the producer in a similar way we learned in the previous sections with a bootstrap server connection.

Step4: After creating the Kafka producer, its time to send tweets to Kafka. Copy the while loop code from the 'github twitter java', given below the 'Creating a client' code. Paste below the producer code.

Creating Twitter Producer

Now, we are ready to read tweets from Twitter. Although, a Kafka producer read messages from a topic. So, create the specified topic using the '-create' command on the CLI. Also, specify the partition value and the replication factor.

For example,

Creating Twitter Producer

Here, the topic 'twitter_topic' has been created with partition value 6 and replication-factor 1. Finally, execute the code and experience Kafka in the real-world application.

The complete code for creating the Twitter Client is given below:

package com.github.learnkafka;

import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class tweetproducer {
    Logger logger = LoggerFactory.getLogger(tweetproducer.class.getName());
    String consumerKey = "";//specify the consumer key from the twitter app
    String consumerSecret = "";//specify the consumerSecret key from the twitter app
    String token = "";//specify the token key from the twitter app
    String secret = "";//specify the secret key from the twitter app

    public tweetproducer() {}//constructor to invoke the producer function

    public static void main(String[] args) {
        new tweetproducer().run();
    }

    public void run() {
        logger.info("Setup");

        BlockingQueue<String> msgQueue = new         
	LinkedBlockingQueue<String>(1000);//Specify the size accordingly.
        Client client = tweetclient(msgQueue);
        client.connect(); //invokes the connection function
        KafkaProducer<String,String> producer=createKafkaProducer();

        // on a different thread, or multiple different threads....
        while (!client.isDone()) {
            String msg = null;
            try {
                msg = msgQueue.poll(5, TimeUnit.SECONDS);//specify the time
            } catch (InterruptedException e) {
                e.printStackTrace();
                client.stop();
            }
            if (msg != null) {
                logger.info(msg);
                producer.send(new ProducerRecord<>("twitter_topic", null, msg), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if(e!=null){
                            logger.error("Something went wrong",e);
                        }
                    }
                });
            }

        }//Specify the topic name, key value, msg

        logger.info("This is the end");//When the reading is complete, inform logger
    }

    public Client tweetclient(BlockingQueue<String> msgQueue) {

        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
        List<String> terms = Lists.newArrayList("India ");//describe 
//anything for which we want to read the tweets.
        hosebirdEndpoint.trackTerms(terms);
        Authentication hosebirdAuth = new        OAuth1(consumerKey,consumerSecret,token,secret);
        ClientBuilder builder = new ClientBuilder()
                .name("Hosebird-Client-01")     // optional: mainly for the logs
                .hosts(hosebirdHosts)
                .authentication(hosebirdAuth)
                .endpoint(hosebirdEndpoint)
                .processor(new StringDelimitedProcessor(msgQueue));


        Client hosebirdClient = builder.build();
        return hosebirdClient; // Attempts to establish a connection.
 }
    public KafkaProducer<String,String> createKafkaProducer(){
 //creating kafka producer   
//creating producer properties   
        String bootstrapServers="127.0.0.1:9092";
        Properties properties= new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,	   bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String,String> first_producer = new KafkaProducer<String, String>(properties);
        return first_producer;

    }
}

In the above code, the user will specify the consumerKey, consumerSecret key, token key as well as the secret key. As it is sensitive information, therefore it cannot be displayed. Copy the key from the 'developer.twitter.com' and paste at their respective positions.

Creating Twitter Producer

Copy the keys from 'Keys and Tokens' and paste in the code.

The output of the above code will be displayed as:

Creating Twitter Producer

The client establishes a connection with the Hosebird. After this, we can see too many tweets produced on 'India'. Post some tweets on any specified topic and try out.

Try out the 'kafka-console-consumer -bootstrap-server 127.0.0.1:9092 -topic twitter_topic' command on the CLI. The output will be the same as on the IntelliJ IDEA terminal:

Creating Twitter Producer

In this way, we can create a real Twitter-Kafka-Producer and send tweets to Kafka.


Next TopicKafka Monitoring




Related Links:


Related Links

Adjectives Ado Ai Android Angular Antonyms Apache Articles Asp Autocad Automata Aws Azure Basic Binary Bitcoin Blockchain C Cassandra Change Coa Computer Control Cpp Create Creating C-Sharp Cyber Daa Data Dbms Deletion Devops Difference Discrete Es6 Ethical Examples Features Firebase Flutter Fs Git Go Hbase History Hive Hiveql How Html Idioms Insertion Installing Ios Java Joomla Js Kafka Kali Laravel Logical Machine Matlab Matrix Mongodb Mysql One Opencv Oracle Ordering Os Pandas Php Pig Pl Postgresql Powershell Prepositions Program Python React Ruby Scala Selecting Selenium Sentence Seo Sharepoint Software Spellings Spotting Spring Sql Sqlite Sqoop Svn Swift Synonyms Talend Testng Types Uml Unity Vbnet Verbal Webdriver What Wpf