我习惯使用python,并使用Scala Spark流媒体库处理实时Twitter流数据。现在,我可以作为字符串发送,但是,我的流服务需要JSON。有没有一种方法可以使我的代码轻松适应以JSON字典(而不是字符串)形式发送?%scalaimport scala.collection.JavaConverters._import com.microsoft.azure.eventhubs._import java.util.concurrent._val namespaceName = "hubnamespace"val eventHubName = "hubname"val sasKeyName = "RootManageSharedAccessKey"val sasKey = "key"val connStr = new ConnectionStringBuilder() .setNamespaceName(namespaceName) .setEventHubName(eventHubName) .setSasKeyName(sasKeyName) .setSasKey(sasKey)val pool = Executors.newFixedThreadPool(1)val eventHubClient = EventHubClient.create(connStr.toString(), pool)def sendEvent(message: String) = { val messageData = EventData.create(message.getBytes("UTF-8")) // CONVERT IT HERE? eventHubClient.get().send(messageData) System.out.println("Sent event: " + message + "\n")}import twitter4j._import twitter4j.TwitterFactoryimport twitter4j.Twitterimport twitter4j.conf.ConfigurationBuilderval twitterConsumerKey = "key"val twitterConsumerSecret = "key"val twitterOauthAccessToken = "key"val twitterOauthTokenSecret = "key"val cb = new ConfigurationBuilder() cb.setDebugEnabled(true) .setOAuthConsumerKey(twitterConsumerKey) .setOAuthConsumerSecret(twitterConsumerSecret) .setOAuthAccessToken(twitterOauthAccessToken) .setOAuthAccessTokenSecret(twitterOauthTokenSecret)val twitterFactory = new TwitterFactory(cb.build())val twitter = twitterFactory.getInstance()val query = new Query(" #happynewyear ")query.setCount(100)query.lang("en")var finished = falsewhile (!finished) { val result = twitter.search(query) val statuses = result.getTweets() var lowestStatusId = Long.MaxValue for (status <- statuses.asScala) { if(!status.isRetweet()){ sendEvent(status.getText()) } lowestStatusId = Math.min(status.getId(), lowestStatusId) Thread.sleep(2000) } query.setMaxId(lowestStatusId - 1)} eventHubClient.get().close()
1 回答
![?](http://img1.sycdn.imooc.com/54586870000183e302200220-100-100.jpg)
牧羊人nacy
TA贡献1862条经验 获得超7个赞
Scala没有将字符串转换为Json的本地方法,您需要使用一个外部库。我建议使用Jackson。如果你使用gradle这个你可以添加这样的依赖性:compile("com.fasterxml.jackson.module:jackson-module-scala_2.12")。(使用适当的scala版本)
然后,您可以像这样将数据对象简单地转换为JSON:
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val json = valueToTree(messageData)
我强烈建议您向Jackson投入精力,如果您使用JSON,将非常需要它。
添加回答
举报
0/150
提交
取消