본문 바로가기

Study/class note

하둡 / 스파크에서 스파크 SQL 사용하기

47 스파크에서 스파크 SQL 사용하기

스파크에서 emp테이블 생성하고 쿼리하기

 

예제1. emp테이블을 drop하는 명령어

scala> sql("drop table emp")

-> emp테이블이 없어서 drop할 게 없기때문에 쿼리 실행하면 자바 에러가 잔뜩 발생함.

 

 

예제2. hive SQL을 스파크에서 사용하겠다고 지정합니다.

scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

 

예제3. emp테이블을 생성합니다.

scala> sqlContext.sql("create  table  IF  NOT EXISTS  emp( empno int, ename  string,job string, mgr  int, hiredate string, sal  int,  comm  int,  deptno  int )row format  delimited  fields  terminated  by  ',' lines  terminated  by  '\n' ")
res8: org.apache.spark.sql.DataFrame = []

 

여러줄로 쓰고 싶으면 sqlContext.sql()안에 쿼리 쓸 때 """ 쿼테이션 마크 3개 써서 엔터로 라인구분 하면 됨(파이썬 처럼)

 

 

예제4. emp테이블에 /home/oracle/emp.csv를 입력하시오.

(base) [oracle@centos ~]$ ls -l emp.csv
-rw-rw-r--. 1 oracle oracle 633  3월 23 13:46 emp.csv
scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/home/oracle/emp.csv' INTO TABLE emp")
res9: org.apache.spark.sql.DataFrame = []

 

 

예제5. 사원 테이블의 건수를 확인하시오.

scala> sql("select * from emp").count()
res10: Long = 14

 

예제6. 부서번호가 30번인 사원들의 이름과 월급과 부서번호를 출력하시오.

scala> sql(""" select ename, sal, deptno
     | from emp
     | where deptno = 30 """).show()
+------+----+------+
| ename| sal|deptno|
+------+----+------+
| BLAKE|2850|    30|
|MARTIN|1250|    30|
| ALLEN|1600|    30|
|TURNER|1500|    30|
| JAMES| 950|    30|
|  WARD|1250|    30|
+------+----+------+

 

 

예제7. 직업이 SALESMAN인 사원들의 이름과 월급과 직업을 출력하시오.

scala> sql(""" select ename, sal, job
     | from emp
     | where job = 'SALESMAN' """).show()
+------+----+--------+
| ename| sal|     job|
+------+----+--------+
|MARTIN|1250|SALESMAN|
| ALLEN|1600|SALESMAN|
|TURNER|1500|SALESMAN|
|  WARD|1250|SALESMAN|
+------+----+--------+

 

 

문제293. (점심시간 문제) 직업, 직업별 토탈월급을 출력하는데 직업별 토탈월급이 높은 것부터 출력하시오.

scala> sql(""" select job, sum(sal) as sumsal
     | from emp
     | group by job
     | order by sumsal desc""").show()
+---------+------+
|      job|sumsal|
+---------+------+
|  MANAGER|  8275|
|  ANALYST|  6000|
| SALESMAN|  5600|
|PRESIDENT|  5000|
|    CLERK|  4150|
+---------+------+

 

문제294. 위의 결과를 다시 출력하는데 having절을 사용해서 직업별 토탈월급이 5000 이상인것만 출력하시오.

scala> sql("show tables").show()
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|      emp|      false|
| employee|      false|
+---------+-----------+


scala> sql(""" select job, sum(sal) as sumsal
     | from emp
     | group by job
     | having sumsal >= 5000
     | order by sumsal desc""").show()
+---------+------+
|      job|sumsal|
+---------+------+
|  MANAGER|  8275|
|  ANALYST|  6000|
| SALESMAN|  5600|
|PRESIDENT|  5000|
+---------+------+

오라클과 다르게 컬럼별칭을 having절에 쓸 수 있음.

 

 

문제295. 이름, 월급, 월급에 대한 순위를 출력하시오.

scala> sql(""" select ename, sal, rank() over (order by sal desc) as rnk
     |            from  emp """).show()

22/03/28 13:51:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

설명: 위의 경고 메세지는 진짜 대용량 데이터를 조회하는것으로 간주하고 쿼리를 수행하는것
       이므로 성능을 좋게 하기 위해서는 partition by 를 사용해라 ~ 라고 권장하는 것입니다.

 

 

문제296. 부서번호, 이름, 월급, 월급에 대한 순위를 출력하는데 순위가 부서번호별로 각각
            월급이 많은 순서데로 순위를 부여하시오 !

scalar> sql("""  select deptno, ename, sal, rank() over (partition by deptno
                                                                     order by sal desc ) as rnk
                  from emp """).show()

 

문제297. (스칼라 SQL의 장점) 직업, 직업별 토탈월급을 출력하는데 결과로 숫자로 나오는 값들에 대한 통계값을 출력하시오 !

scala> sql(""" select job, sum(sal)
     | from emp
     | group by job""").describe().show()
22/03/28 14:01:47 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
+-------+------------------+
|summary|          sum(sal)|
+-------+------------------+
|  count|                 5|
|   mean|            5805.0|
| stddev|1546.6091943344964|
|    min|              4150|
|    max|              8275|
+-------+------------------+

 

문제298. 입사년도(4자리), 입사년도별 토탈월급의 출력값에 대한 통계값 5가지를 출력하시오.

scala> sql(""" select year(to_date(hiredate)), sum(sal)
     | from emp
     | group by year(to_date(hiredate)) """).describe().show()
+-------+-------------------------------------+------------------+
|summary|year(to_date(CAST(hiredate AS DATE)))|          sum(sal)|
+-------+-------------------------------------+------------------+
|  count|                                    4|                 4|
|   mean|                               1981.5|           7256.25|
| stddev|                   1.2909944487358154|10499.332816104714|
|    min|                                 1980|               800|
|    max|                                 1983|             22825|
+-------+-------------------------------------+------------------+

 

문제299. 사원 테이블을 출력하는데 위의 3줄만 출력하시오.

# head()
scala> sql(""" select * from emp """).head(3)
res2: Array[org.apache.spark.sql.Row] = Array([7839,KING,PRESIDENT,null,1981-11-17,5000,null,10], [7698,BLAKE,MANAGER,7839,1981-05-01,2850,null,30], [7782,CLARK,MANAGER,7839,1981-05-09,2450,null,10])


# limit
scala> sql(""" select * from emp limit 3""").show()
+-----+-----+---------+----+----------+----+----+------+
|empno|ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7839| KING|PRESIDENT|null|1981-11-17|5000|null|    10|
| 7698|BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|
| 7782|CLARK|  MANAGER|7839|1981-05-09|2450|null|    10|
+-----+-----+---------+----+----------+----+----+------+

 

문제300. 이름, 월급, 월급에 대한 순위를 출력하는데 위에 3줄만 출력하시오.

scala> sql(""" select ename, sal, rank() over (order by sal desc) as rnk
     | from emp limit 3""").show()
22/03/28 14:10:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-----+----+---+
|ename| sal|rnk|
+-----+----+---+
| KING|5000|  1|
| FORD|3000|  2|
|SCOTT|3000|  2|
+-----+----+---+

 

문제301. dept테이블을 생성하기 위해서 /home/oracle 밑에 dept.csv가 있는지 확인하시오.

(base) [oracle@centos ~]$ ls -l dept.csv
-rw-rw-r--. 1 oracle oracle 102  3월 23 11:52 dept.csv

 

문제302. 스파크에서 dept테이블을 생성하시오.

scala> sqlContext.sql("""create  table  IF  NOT EXISTS  dept( deptno int, dname  string, loc string)
     | row format  delimited
     | fields  terminated  by  ','
     | lines  terminated  by  '\n' """)
res6: org.apache.spark.sql.DataFrame = []

scala> sql("show tables").show()
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|     dept|      false|
|      emp|      false|
| employee|      false|
+---------+-----------+

 

문제303. dept테이블에 /home/oracle 밑에 dept.csv의 데이터를 입력하시오.

scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/home/oracle/dept.csv' INTO TABLE dept")
res8: org.apache.spark.sql.DataFrame = []

scala> sql("select * from dept").show()
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|  null|     dname|     loc|
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+

scala> sql("drop table dept")
res10: org.apache.spark.sql.DataFrame = []

/home/oracle/dept.csv를 vi편집기로 열어서 컬럼 삭제 수정

scala> sqlContext.sql("""create  table  IF  NOT EXISTS  dept( deptno int, dname  string, loc string)
     | row format  delimited
     | fields  terminated  by  ','
     | lines  terminated  by  '\n' """)
res11: org.apache.spark.sql.DataFrame = []

scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/home/oracle/dept.csv' INTO TABLE dept")
res12: org.apache.spark.sql.DataFrame = []

scala> sql("select * from dept").show()
+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+

 

문제304. 사원 테이블과 부서 테이블을 조인해서 이름과 부서위치를 출력하시오.

-> 1999ansi 조인문법의 on절을 사용한 조인 문법으로 수행하세요

scala> sql(""" select e.ename, d.loc
     | from emp e join dept d
     | on (e.deptno = d.deptno) """).show()
+------+--------+
| ename|     loc|
+------+--------+
|  KING|NEW YORK|
| BLAKE| CHICAGO|
| CLARK|NEW YORK|
| JONES|  DALLAS|
|MARTIN| CHICAGO|
| ALLEN| CHICAGO|
|TURNER| CHICAGO|
| JAMES| CHICAGO|
|  WARD| CHICAGO|
|  FORD|  DALLAS|
| SMITH|  DALLAS|
| SCOTT|  DALLAS|
| ADAMS|  DALLAS|
|MILLER|NEW YORK|
+------+--------+

 

문제305. DALLAS에서 근무하는 사원들의 이름, 부서위치, 월급, 직업을 출력하는데 직업이 annalyst는 제외하고 출력하시오.

scala> sql(""" select e.ename, d.deptno, e.sal, e.job
     | from emp e join dept d
     | on (e.deptno = d.deptno)
     | where d.loc = 'DALLAS' and e.job != 'ANALYST' """).show()
+-----+------+----+-------+
|ename|deptno| sal|    job|
+-----+------+----+-------+
|JONES|    20|2975|MANAGER|
|SMITH|    20| 800|  CLERK|
|ADAMS|    20|1100|  CLERK|
+-----+------+----+-------+

 

문제306. ALLEN보다 늦게 입사한 사원들의 이름과 입사일을 출력하시오.

scala> sql(""" select ename, hiredate
     | from emp
     | where hiredate > (select hiredate
     |                      from emp
     |                      where ename = 'ALLEN') """).show()
+------+----------+
| ename|  hiredate|
+------+----------+
|  KING|1981-11-17|
| BLAKE|1981-05-01|
| CLARK|1981-05-09|
| JONES|1981-04-01|
|MARTIN|1981-09-10|
|TURNER|1981-08-21|
| JAMES|1981-12-11|
|  WARD|1981-02-23|
|  FORD|1981-12-11|
| SCOTT|1982-12-22|
| ADAMS|1983-01-15|
|MILLER|1982-01-11|
+------+----------+

 

 

하둡의 하이브와 스파크에서 결론적으로 뭘 생성해줘야 하는가?

빅데이터에서 원하는 데이터만 검색해서 그 결과를 csv로 저장하는 역할을 해줘야함.

 

예제1. 직업과 직업별 토탈월급의 결과를 csv 파일로 저장하시오.

scala> sql(""" select job, sum(sal)
     | from emp
     | group by job""").coalesce(1).write.option("header","true").option("sep",",").mode("overwrite").csv("/home/oracle/ff")

coalesce(1) : 하나의 파일에 모두 담아라. 숫자 2 넣으면 2개의 파일에 나눠서 담는다는 뜻

write.option("header","true") : 컬럼명이 나오게 저장

option("sep",",") : 구분자 콤마, csv 파일 형태로 저장

mode("overwrite").csv("/home/oracle/ff") : /home/oracle/ff라는 폴더를 만들어서 폴더 안에 데이터를 생성해라

 

 

scala에서 파일을 리눅스 os로 내린 후 다른 터미널 창에서 파일 열어서 확인

(base) [oracle@centos ~]$ ls -ld ff
drwxrwxr-x. 2 oracle oracle 170  3월 28 15:19 ff
(base) [oracle@centos ~]$ cd ff
(base) [oracle@centos ff]$ ls
_SUCCESS  part-r-00000-0cae0e90-2ded-4b2b-acfb-212f5bab1073.csv

(base) [oracle@centos ff]$ cat part-r-00000-0cae0e90-2ded-4b2b-acfb-212f5bab1073.csv
job,sum(sal)
ANALYST,6000
SALESMAN,5600
CLERK,4150
MANAGER,8275
PRESIDENT,5000

 

 

문제307. 부서위치, 부서위치별 토탈월급을 출력하는데 부서위치별 토탈월급이 2000 이상인 것만 출력하고 부서위치 CHICAGO는 제외, 부서위치별 토탈월급이 높은 것부터 출력하는 결과를 /home/oracle 밑에 kk라는 폴더 안에 csv파일 형태로 생성되게 하시오.

scala> sql(""" select d.loc, sum(e.sal) as sumsal
     | from emp e join dept d
     | on (e.deptno = d.deptno)
     | where d.loc != 'CHICAGO'
     | group by d.loc
     | having sum(e.sal) >= 2000
     | order by sumsal desc""").coalesce(1).write.option("header","true").option("sep",",").mode("overwrite").csv("/home/oracle/kk")

scala에서 파일을 리눅스 os로 내린 후 다른 터미널 창에서 파일 열어서 확인

(base) [oracle@centos ~]$ cd kk
(base) [oracle@centos kk]$ ls
_SUCCESS  part-r-00000-789c874c-fc12-47b3-b66e-bae4582d0040.csv
(base) [oracle@centos kk]$ cat part-r-00000-789c874c-fc12-47b3-b66e-bae4582d0040.csv
loc,sumsal
DALLAS,10875
NEW YORK,8750

 

 

반응형