공부하는 블로그

Spark 9일차 본문

Develop/Spark

Spark 9일차

모아&모지리 2017. 11. 16. 16:59

namenode 에서 start-all.sh 치고 password는 1234 또 password 1234

>sudo useradd Admin

>hdfs dfs -chown Admin /

>hdfs dfs -mkdir /Admin

>hdfs dfs -chown Admin /Admin

namenode 에서 jps, datanode 에서도 jps

namenode 에서 ifconfig 치고 두번째 단락꺼 ip 를 사용해서 java 프로젝트 내에서

dessert.join(order, "menuId")

                .where("orderCount != 1")

                .toJavaRDD()

                .saveAsTextFile("hdfs://192.168.56.102:9000/Admin/Dessert/1"); 실행

namenode 에서 hdfs dfs -ls /Admin/Dessert/1 쳐서 파일 리스트가 나오면 성공


* 집에서 할 경우 (namenode 에서 터미널 열고)

1. 컴퓨터 계정명 확인

2. namenode 에서 아래 작성

    > sudo useradd 계정명 (Window 계정명과 동일)

    > hdfs dfs -chown 계정명 /

    > hdfs dfs -mkdir /계정명

    > hdfs dfs -chown 계정명 /계정명

    

- kafka 콘솔 에서 메시지 받아 hdfs 파일로 보내기

JavaPairInputDStream<String, String> ds2 = KafkaUtils.<String, String, StringDecoder, StringDecoder> createDirectStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, params, topic2);

//        ds1.print();

        ds2.map(tuple -> {

            System.out.println(tuple);

            String key = "Receive: " + tuple._1;

            String value = "Receive: " + tuple._2;


            key = key == null ? "Temp" : key;


            String[] name = value.split(",");


            return new Tuple2<>(key, new Tuple2<>(name[0], name[1]));

        })//.print();

        .dstream()

        .saveAsTextFiles("hdfs://192.168.56.102:9000/Admin/Kafka/data", "log");


- 코드에서 메시지 받아 kafka 콘솔로 보내기

Properties prop = new Properties();

        prop.put("bootstrap.servers", "localhost:9092");


        prop.put("acks", "all");

        prop.put("block.on.buffer.full", "true");


        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

        ProducerRecord<String, String> record = null;


        final String key = "app";


        Scanner scanner = new Scanner(System.in);

        String value = "";


        while(true) {


            System.out.println("발행할 메시지를 입력하세요. (, 로 데이터를 구분합니다)");

            value = scanner.nextLine();


            if (value.indexOf(",") == -1) {

                System.out.println("콤마가 없습니다. 메시지는 발행되지 않습니다.");

                continue;

            }


            record = new ProducerRecord<String, String>("sparkTest", key, value);

            producer.send(record);


            System.out.println("메시지가 발행되었습니다. 계속하려면 Y를 입력하세요.");

            if(!scanner.next().equalsIgnoreCase("Y")) {

                break;

            }

        }


        producer.close();


- Hadoop 에 쓰여진 파일을 코드에서 읽기

        String hdfsUri = "hdfs://192.168.56.102:9000";


        Configuration conf = new Configuration();


        // 하둡 연결

        FileSystem fs = FileSystem.get(URI.create(hdfsUri), conf);


        // 디렉토리 리스트 받아오기

        RemoteIterator<LocatedFileStatus> fileIterator =

                fs.listFiles(new Path("/Admin/Kafka"), true);


        // 탐색을 위한 준비

        List<LocatedFileStatus> fileList = new ArrayList();

        while(fileIterator.hasNext()) {

            fileList.add(fileIterator.next());

        }


        // 디렉토리 리스트 및 파일 이름 출력

        fileList.stream()

                .filter(FileStatus::isFile)

                .filter(fileStatus -> !fileStatus.getPath().getName().equals("_SUCCESS"))

                .filter(fileStatus -> fileStatus.getLen() > 0)

                .forEach(fileStatus -> {

                    System.out.println( fileStatus.getPath() + "/");

                    System.out.println( fileStatus.getLen());

                });

        



'Develop > Spark' 카테고리의 다른 글

Kafka 와 Zookeeper 설치하기  (0) 2017.11.16
Spark의 RDD 의 문제점  (0) 2017.11.16
Spark 예제 1  (0) 2017.11.16
Apache spark 소개 및 실습(시작하기)  (0) 2017.11.16
Spark 예제 0  (0) 2017.11.07