티스토리 뷰

의존성

/*
 * This file was generated by the Gradle 'init' task.
 *
 * This generated file contains a sample Scala library project to get you started.
 * For more details take a look at the 'Building Java & JVM projects' chapter in the Gradle
 * User Manual available at https://docs.gradle.org/7.1/userguide/building_java_projects.html
 */

plugins {
    // Apply the scala Plugin to add support for Scala.
    id 'scala'

    // Apply the java-library plugin for API and implementation separation.
    id 'java-library'
}

repositories {
    // Use Maven Central for resolving dependencies.
    mavenCentral()
}

def versions = [
        AkkaVersion: "2.6.14",
        JacksonVersion: "2.11.4",
        ScalaBinary: "2.13"
]

dependencies {
    // Use Scala 2.13 in our library project
    implementation 'org.scala-lang:scala-library:2.13.5'

    // This dependency is used internally, and not exposed to consumers on their own compile classpath.
    implementation 'com.google.guava:guava:30.1-jre'

    // Use Scalatest for testing our library
    testImplementation 'junit:junit:4.13.2'
    testImplementation 'org.scalatest:scalatest_2.13:3.2.6'
    testImplementation 'org.scalatestplus:junit-4-13_2.13:3.2.2.0'

    // Need scala-xml at test runtime
    testRuntimeOnly 'org.scala-lang.modules:scala-xml_2.13:1.2.0'

    // This dependency is exported to consumers, that is to say found on their compile classpath.
    api 'org.apache.commons:commons-math3:3.6.1'

    implementation "com.typesafe.akka:akka-actor-typed_${versions.ScalaBinary}:${versions.AkkaVersion}"
    implementation "com.typesafe.akka:akka-stream-kafka_${versions.ScalaBinary}:2.1.0"
    implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
    implementation "com.fasterxml.jackson.core:jackson-databind:${versions.JacksonVersion}"
}

 

카프카 소싱해서 카프라로 내보내는 간단한 코드

package somepackage

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import akka.kafka.scaladsl.{Consumer, Producer}
import org.apache.kafka.clients.producer.ProducerRecord

object Main extends App {

  implicit val actorSystem: ActorSystem[Nothing]  = ActorSystem[Nothing](Behaviors.empty, "video-metric-stream")

  // Consumer source 생성
  val kafkaConsumerBootstrapServers = "some-bootstrap-server:9092"
  val kafkaConsumerTopic = "some-source-topic"
  val kafkaConsumerSettings = ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers(kafkaConsumerBootstrapServers)
    .withGroupId("some-group-id")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000")
  val kafkaCommitableSource = Consumer.committableSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConsumerTopic))

  // Producer sink 생성
  val kafkaProducerBootstrapServers = "another-bootstrap-server:9092"
  val kafkaProducerTopic = "some-dest-topic"
  val producerSettings = ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
    .withBootstrapServers(kafkaProducerBootstrapServers)
  val kafkaPlainSink = Producer.plainSink(producerSettings)

  // consumer -> producer 로 데이터 단순 전달
  kafkaCommitableSource
    .map (record => new ProducerRecord[String, String](kafkaProducerTopic, record.record.value()))
    .runWith(kafkaPlainSink)



}

 

참조

https://blog.knoldus.com/alpakka/  Kafka 토픽을 소싱해서 Kafka 토픽으로 내보내는 예제가 설명되어 있다.

https://github.com/akka/alpakka-samples   java 코드만 있는 예제도 있고 보기가 어렵다.

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/03   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
글 보관함