为了账号安全,请及时绑定邮箱和手机立即绑定

在Java / Scala中以JSON格式发送流数据

在Java / Scala中以JSON格式发送流数据

扬帆大鱼 2021-04-09 14:11:34
我习惯使用python,并使用Scala Spark流媒体库处理实时Twitter流数据。现在,我可以作为字符串发送,但是,我的流服务需要JSON。有没有一种方法可以使我的代码轻松适应以JSON字典(而不是字符串)形式发送?%scalaimport scala.collection.JavaConverters._import com.microsoft.azure.eventhubs._import java.util.concurrent._val namespaceName = "hubnamespace"val eventHubName = "hubname"val sasKeyName = "RootManageSharedAccessKey"val sasKey = "key"val connStr = new ConnectionStringBuilder()            .setNamespaceName(namespaceName)            .setEventHubName(eventHubName)            .setSasKeyName(sasKeyName)            .setSasKey(sasKey)val pool = Executors.newFixedThreadPool(1)val eventHubClient = EventHubClient.create(connStr.toString(), pool)def sendEvent(message: String) = {  val messageData = EventData.create(message.getBytes("UTF-8"))  // CONVERT IT HERE?  eventHubClient.get().send(messageData)  System.out.println("Sent event: " + message + "\n")}import twitter4j._import twitter4j.TwitterFactoryimport twitter4j.Twitterimport twitter4j.conf.ConfigurationBuilderval twitterConsumerKey = "key"val twitterConsumerSecret = "key"val twitterOauthAccessToken = "key"val twitterOauthTokenSecret = "key"val cb = new ConfigurationBuilder()  cb.setDebugEnabled(true)  .setOAuthConsumerKey(twitterConsumerKey)  .setOAuthConsumerSecret(twitterConsumerSecret)  .setOAuthAccessToken(twitterOauthAccessToken)  .setOAuthAccessTokenSecret(twitterOauthTokenSecret)val twitterFactory = new TwitterFactory(cb.build())val twitter = twitterFactory.getInstance()val query = new Query(" #happynewyear ")query.setCount(100)query.lang("en")var finished = falsewhile (!finished) {  val result = twitter.search(query)  val statuses = result.getTweets()  var lowestStatusId = Long.MaxValue  for (status <- statuses.asScala) {    if(!status.isRetweet()){      sendEvent(status.getText())    }    lowestStatusId = Math.min(status.getId(), lowestStatusId)    Thread.sleep(2000)  }  query.setMaxId(lowestStatusId - 1)} eventHubClient.get().close()
查看完整描述

1 回答

?
牧羊人nacy

TA贡献1862条经验 获得超7个赞

Scala没有将字符串转换为Json的本地方法,您需要使用一个外部库。我建议使用Jackson。如果你使用gradle这个你可以添加这样的依赖性:compile("com.fasterxml.jackson.module:jackson-module-scala_2.12")。(使用适当的scala版本)


然后,您可以像这样将数据对象简单地转换为JSON:


val mapper = new ObjectMapper() with ScalaObjectMapper

mapper.registerModule(DefaultScalaModule)


val json = valueToTree(messageData)

我强烈建议您向Jackson投入精力,如果您使用JSON,将非常需要它。


查看完整回答
反对 回复 2021-04-14
  • 1 回答
  • 0 关注
  • 170 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信