C-Sharp | Java | Python | Swift | GO | WPF | Ruby | Scala | F# | JavaScript | SQL | PHP | Angular | HTML
Creating Twitter ProducerIn this section, we will learn to create a twitter producer. There are basically three steps to create a twitter producer:
Step1: Create a new java package, following the package naming convention rules. Then, create a java class within it, say 'tweetproducer.java.' Step2: Create a twitter client by creating a method for it. Now, copy the Quickstart code from the 'github twitter java' to the twitter client method, as shown below: Paste it in the newly created method. This code will create a connection between the client and the hbc host. The BlockingQueue will stop the client to dequeue or enqueue the messages when the queue is empty or already full. As we are using hbc-core, we only require the msgQueue. Also, we will follow the terms, not the people. Therefore, copy the highlighted code only. Now, copy the 'Creating a client' code given below the connection code as: Paste the code below the connection code. This code will create a twitter client through the client builder. As we are using msgQueue, do not copy the red highlighted code, which is for the eventMessageQueue. It is not required. Step3: Create the producer in a similar way we learned in the previous sections with a bootstrap server connection. Step4: After creating the Kafka producer, its time to send tweets to Kafka. Copy the while loop code from the 'github twitter java', given below the 'Creating a client' code. Paste below the producer code. Now, we are ready to read tweets from Twitter. Although, a Kafka producer read messages from a topic. So, create the specified topic using the '-create' command on the CLI. Also, specify the partition value and the replication factor. For example, Here, the topic 'twitter_topic' has been created with partition value 6 and replication-factor 1. Finally, execute the code and experience Kafka in the real-world application. The complete code for creating the Twitter Client is given below: package com.github.learnkafka; import com.google.common.collect.Lists; import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.core.Client; import com.twitter.hbc.core.Constants; import com.twitter.hbc.core.Hosts; import com.twitter.hbc.core.HttpHosts; import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; import com.twitter.hbc.core.processor.StringDelimitedProcessor; import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.OAuth1; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class tweetproducer { Logger logger = LoggerFactory.getLogger(tweetproducer.class.getName()); String consumerKey = "";//specify the consumer key from the twitter app String consumerSecret = "";//specify the consumerSecret key from the twitter app String token = "";//specify the token key from the twitter app String secret = "";//specify the secret key from the twitter app public tweetproducer() {}//constructor to invoke the producer function public static void main(String[] args) { new tweetproducer().run(); } public void run() { logger.info("Setup"); BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);//Specify the size accordingly. Client client = tweetclient(msgQueue); client.connect(); //invokes the connection function KafkaProducer<String,String> producer=createKafkaProducer(); // on a different thread, or multiple different threads.... while (!client.isDone()) { String msg = null; try { msg = msgQueue.poll(5, TimeUnit.SECONDS);//specify the time } catch (InterruptedException e) { e.printStackTrace(); client.stop(); } if (msg != null) { logger.info(msg); producer.send(new ProducerRecord<>("twitter_topic", null, msg), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e!=null){ logger.error("Something went wrong",e); } } }); } }//Specify the topic name, key value, msg logger.info("This is the end");//When the reading is complete, inform logger } public Client tweetclient(BlockingQueue<String> msgQueue) { Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); List<String> terms = Lists.newArrayList("India ");//describe //anything for which we want to read the tweets. hosebirdEndpoint.trackTerms(terms); Authentication hosebirdAuth = new OAuth1(consumerKey,consumerSecret,token,secret); ClientBuilder builder = new ClientBuilder() .name("Hosebird-Client-01") // optional: mainly for the logs .hosts(hosebirdHosts) .authentication(hosebirdAuth) .endpoint(hosebirdEndpoint) .processor(new StringDelimitedProcessor(msgQueue)); Client hosebirdClient = builder.build(); return hosebirdClient; // Attempts to establish a connection. } public KafkaProducer<String,String> createKafkaProducer(){ //creating kafka producer //creating producer properties String bootstrapServers="127.0.0.1:9092"; Properties properties= new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String,String> first_producer = new KafkaProducer<String, String>(properties); return first_producer; } } In the above code, the user will specify the consumerKey, consumerSecret key, token key as well as the secret key. As it is sensitive information, therefore it cannot be displayed. Copy the key from the 'developer.twitter.com' and paste at their respective positions. Copy the keys from 'Keys and Tokens' and paste in the code. The output of the above code will be displayed as: The client establishes a connection with the Hosebird. After this, we can see too many tweets produced on 'India'. Post some tweets on any specified topic and try out. Try out the 'kafka-console-consumer -bootstrap-server 127.0.0.1:9092 -topic twitter_topic' command on the CLI. The output will be the same as on the IntelliJ IDEA terminal: In this way, we can create a real Twitter-Kafka-Producer and send tweets to Kafka.
Next TopicKafka Monitoring
|