2017-01-27 13 views
0

Привет Я пытаюсь войти в тему Kafka из группы исполнителей, использующих Apache Spark с Log4J и дополнением Kafka-Appender. Я могу регистрироваться с исполнителями, используя базовый File Appender, но не для Kafka.Функциональность Log4J-Kafka в Apache Spark/scala

Вот мое log4j.properties я сделал обычай для этого:

log4j.rootLogger=INFO, console, KAFKA, file 

log4j.appender.console=org.apache.log4j.ConsoleAppender 
log4j.appender.console.target=System.err 
log4j.appender.console.layout=org.apache.log4j.PatternLayout 
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n 


log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender 
log4j.appender.KAFKA.topic=test2 
log4j.appender.KAFKA.name=localhost 
log4j.appender.KAFKA.host=localhost 
log4j.appender.KAFKA.port=9092 
log4j.appender.KAFKA.brokerList=localhost:9092 
log4j.appender.KAFKA.compressionType=none 
log4j.appender.KAFKA.requiredNumAcks=0 
log4j.appender.KAFKA.syncSend=true 
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout 
log4j.appender.KAFKA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n 



log4j.appender.file=org.apache.log4j.RollingFileAppender 
log4j.appender.file.File=log4j-application.log 
log4j.appender.file.MaxFileSize=5MB 
log4j.appender.file.MaxBackupIndex=10 
log4j.appender.file.layout=org.apache.log4j.PatternLayout 
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n 

Вот мой код (до сих пор). Я попытался передать определение регистратора так, что каждый исполнитель получает копию, но я не знаю, почему он не собирается Кафка:

import org.apache.log4j._ 
import org.apache.spark._ 
import org.apache.spark.rdd.RDD 
import java.io._ 
import org.apache.kafka.log4jappender.KafkaLog4jAppender 

class Mapper(n: Int) extends Serializable{ 
    @transient lazy val suplogger: Logger = Logger.getLogger("myLogger") 

    def doSomeMappingOnDataSetAndLogIt(rdd: RDD[Int]): RDD[String] = 
    rdd.map{ i => 
     val sparkConf: SparkConf =new org.apache.spark.SparkConf() 
     logger.setLevel((Level) Level.ALL) 
     suplogger.warn(sparkConf.toDebugString) 
     val pid = Integer.parseInt(new File("/proc/self").getCanonicalFile().getName()); 
     suplogger.warn("--------------------") 
     suplogger.warn("mapping: " + i) 
     val supIterator = new scala.collection.JavaConversions.JEnumerationWrapper(suplogger.getAllAppenders()) 
     suplogger.warn("List is " + supIterator.toList) 
     suplogger.warn("Num of list is: " + supIterator.size) 

     //(i + n).toString 
     "executor pid = "+pid + "debug string: " + sparkConf.toDebugString.size 
    } 
} 

object Mapper { 
    def apply(n: Int): Mapper = new Mapper(n) 
} 

object HelloWorld { 
    def main(args: Array[String]): Unit = { 
    println("sup") 
    println("yo") 
    val log = LogManager.getRootLogger 
    log.setLevel(Level.WARN) 
    val nameIterator = new scala.collection.JavaConversions.JEnumerationWrapper(log.getAllAppenders()) 
    println(nameIterator.toList) 

    val conf = new SparkConf().setAppName("demo-app") 
    val sc = new SparkContext(conf) 
    log.warn(conf.toDebugString) 
    val pid = Integer.parseInt(new File("/proc/self").getCanonicalFile().getName()); 
    log.warn("--------------------") 
    log.warn("IP: "+java.net.InetAddress.getLocalHost() +" PId: "+pid) 

    log.warn("Hello demo") 

    val data = sc.parallelize(1 to 100, 10) 

    val mapper = Mapper(1) 

    val other = mapper.doSomeMappingOnDataSetAndLogIt(data) 

    other.collect() 

    log.warn("I am done") 
    } 

} 

Вот некоторый пример вывод из файла журнала:

2017-01-25 06:29:15 WARN myLogger:19 - spark.driver.port=54335 
2017-01-25 06:29:15 WARN myLogger:21 - -------------------- 
2017-01-25 06:29:15 WARN myLogger:23 - mapping: 1 
2017-01-25 06:29:15 WARN myLogger:25 - List is List() 
2017-01-25 06:29:15 WARN myLogger:26 - Num of list is: 0 
2017-01-25 06:29:15 WARN myLogger:19 - spark.driver.port=54335 
2017-01-25 06:29:15 WARN myLogger:21 - -------------------- 
2017-01-25 06:29:15 WARN myLogger:23 - mapping: 2 
2017-01-25 06:29:15 WARN myLogger:25 - List is List() 
2017-01-25 06:29:15 WARN myLogger:26 - Num of list is: 0 
2017-01-25 06:29:15 WARN myLogger:19 - spark.driver.port=54335 
2017-01-25 06:29:15 WARN myLogger:21 - -------------------- 

Спасибо за вашу помощь, если у вас есть что-нибудь, что вы, ребята (или девочки), мне от меня, пожалуйста, дайте мне знать!

Вот копия команды искровой представить

искрового представить --deploy режим клиента --files spark_test/mylogger.props --packages «com.databricks: искровой csv_2.10: 1,4. 0, org.apache.kafka: kafka-log4j-appender: 0.10.1.1 "--num-executors 4 --executor-core 1 --driver-java-options" -Dlog4j.configuration = file: /// home/mapr/spark_test/mylogger.props "--conf" spark.executor.extraJavaOptions = -Dlog4j.configuration = файл: ///home/mapr/spark_test/mylogger.props "--class" HelloWorld "helloworld.jar

+0

Где вы запускаете свою искру? В кластере YARN? Если да, то у вас может возникнуть проблема с вашим брокером (localhost). Вы видите что-нибудь в stdout/stderr? – TobiSH

+0

Я действительно понял, в чем проблема! Я буду следить – Sparkhelppls

+0

@TobiSH Я понял это, но я не уверен, почему мое решение работает – Sparkhelppls

ответ

0

Я понял, в чем проблема. Я не развертывал кластер, я развертывал только в клиентском режиме. По правде говоря, я не знаю, почему это сработало, когда я отправил в кластер.

Я использовал MapR Песочница VM https://www.mapr.com/products/mapr-sandbox-hadoop

Если кто-нибудь может помочь объяснить, почему клиент/кластерный разница здесь я был бы очень благодарен!