- 우분투 실행 환경에서 시작!!
- (1) 필요 라이브러리 설치
sudo apt-get -y update
sudo apt-get -y upgrade
sudo apt-get -y dist-upgrade
sudo apt-get install -y vim wget unzip ssh openssh-* net-tools - (2) 자바 설치
sudo apt-get install -y openjdk-8-jdk
java -version >>> 자바 버전 확인
sudo find / -name java-8-openjdk-amd64 2>/dev/null >>> 환경변수에 추가해줄 경로를 확인
(find . -name [FILE] 2> /dev/null검색 중 에러 메시지 출력하지 않기 ) - sudo vim /etc/environment
- PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin:/usr/local/hadoop/bin:/usr/local/hadoop/sbin:/usr/local/spark/bin:/usr/local/spark/sbin:/usr/bin/python3"
JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"
HADOOP_HOME="/usr/local/hadoop"
SPARK_HOME="/usr/local/spark" - source /etc/environment
- 1. 하둡 설치
sudo wget https://ftp.sakura.ad.jp/pub/apache/hadoop/common/hadoop-3.2.3/hadoop-3.2.3.tar.gz
2. 압축 해제
sudo tar -zxvf hadoop-3.2.3.tar.gz -C /usr/local
3. 이름 변경(좀더 쉽게쉽게 하기위해)
sudo mv /usr/local/hadoop-3.2.3 /usr/local/hadoop
#mv는 파일 위치 변경 또는 파일 명 변경 모두 가능한 명령 - source ~/.bashrc
- sudo echo 'export HADOOP_HOME=/usr/local/hadoop' >> ~/.bashrc sudo echo 'export
- source ~/.bashrc
- sudo vim $HADOOP_HOME/etc/hadoop/core-site.xml 들어가서 아래 코드 입력
<configuration>
<property>
# HDFS의 기본 파일 시스템 이름을 지정합니다. 여기서는
# hdfs://nn1:9000으로 설정되어 있으므로
# 클라이언트가 HDFS에 액세스할 때
# 기본적으로 nn1 노드의 9000번 포트를 사용
<name>fs.default.name</name>
# 로컬에 단일노드로 할 예정으로 ///으로 설정
<value>hdfs:///</value>
</property>
</configuration>
11. hadoop-env.sh 파일 편집 [ Hadoop 실행 환경을 구성하는 데 사용되는 스크립트 ]
- sudo vim $HADOOP_HOME/etc/hadoop/hadoop-env.sh
> / export 로 검색 후 해당 줄 주석 지우고 변경
i 누르면 인서트 가능 :wq으로 저장
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export HADOOP_HOME=/usr/local/hadoop
12. 1.스파크 설치 sudo wget https://ftp.unicamp.br/pub/apache/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
2. 압축 해제 sudo tar -xzvf spark-3.2.4-bin-hadoop3.2.tgz -C /usr/local
3. 이름 변경 sudo mv /usr/local/spark-3.2.4-bin-hadoop3.2 /usr/local/spark
13. source 명령을 통해 지역 변수로 올린다. source /etc/environment
14. spark-env.sh 파일 편집 [ Spark 및 Hadoop을 실행 및 구성하는 데 사용될 환경 변수 ]
cd $SPARK_HOME/conf
sudo cp spark-env.sh.template spark-env.sh
sudo vim spark-env.sh
export SPARK_HOME=/usr/local/spark
export SPARK_CONF_DIR=/usr/local/spark/conf
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SPARK_MASTER_WEBUI_PORT=18080
15. sudo mkdir -p /usr/local/spark/logs && sudo chown -R $USER:$USER /usr/local/spark/
16. (5) 파이썬, 파이스파크 설치
- sudo apt-get install -y python3-pip
- sudo pip3 install pyspark findspark
- source ~/.bashrc
- sudo echo 'export PYTHONPATH=/usr/bin/python3' >> ~/.bashrc
- sudo echo 'export PYSPARK_PYTHON=/usr/bin/python3' >> ~/.bashrc
- source ~/.bashrc
17. 데이터 수집 py파일을 만들어준다 vim first_coin.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, year, month, dayofmonth, hour, minute
import requests
# 스파크 세션 생성
spark = SparkSession.builder \
.master("local") \
.appName("Bithumb API to CSV") \
.getOrCreate()
# API URL
api_url = "https://api.bithumb.com/public/ticker/ALL_"
# API 데이터 요청 및 JSON 데이터 추출
response = requests.get(api_url)
data = response.json()
# data 필드에서 코인 정보 추출
coin_data = data.get("data")
# "date" 필드 제외
coin_data.pop("date", None)
# 코인 정보를 RDD로 변환
coin_rdd = spark.sparkContext.parallelize(coin_data.items())
# RDD를 DataFrame으로 변환
coin_df = coin_rdd.toDF(["Coin", "Info"])
# Info 필드를 JSON 형태의 컬럼들로 분리
for field in ["opening_price", "closing_price", "min_price", "max_price", "units_traded",
"acc_trade_value", "prev_closing_price", "units_traded_24H", "acc_trade_value_24H"]:
coin_df = coin_df.withColumn(field, coin_df["Info"][field])
# 불필요한 컬럼 제거
coin_df = coin_df.drop("Info", "fluctuate_24H", "fluctuate_rate_24H")
# 현재 시간과 각각의 시간 정보를 추출하여 컬럼으로 추가
coin_df = coin_df.withColumn("now_time", current_timestamp())
coin_df = coin_df.withColumn("year", year("now_time"))
coin_df = coin_df.withColumn("month", month("now_time"))
coin_df = coin_df.withColumn("day", dayofmonth("now_time"))
coin_df = coin_df.withColumn("hour", hour("now_time"))
coin_df = coin_df.withColumn("minute", minute("now_time"))
# DataFrame을 CSV 파일로 저장
coin_df.coalesce(1).write.option("header", "true").option("ignoreSuccessFile", "true").csv("coin_data.csv")
print("CSV 파일이 생성되었습니다.")
18. 스파크 파일 실행 -> spark-submit --master local first_coin.py
19. api 데이터를 가지고 올떄마다 기존의 csv파일과 합친다.
# 스파크 세션 생성
spark = SparkSession.builder \
.master("local") \
.appName("Bithumb API to CSV") \
.getOrCreate()
# API URL
api_url = "https://api.bithumb.com/public/ticker/ALL_"
# API 데이터 요청 및 JSON 데이터 추출
response = requests.get(api_url)
data = response.json()
# data 필드에서 코인 정보 추출
coin_data = data.get("data")
# "date" 필드 제외
coin_data.pop("date", None)
# 코인 정보를 RDD로 변환
coin_rdd = spark.sparkContext.parallelize(coin_data.items())
# RDD를 DataFrame으로 변환
coin_df = coin_rdd.toDF(["Coin", "Info"])
# Info 필드를 JSON 형태의 컬럼들로 분리
for field in ["opening_price", "closing_price", "min_price", "max_price", "units_traded",
"acc_trade_value", "prev_closing_price", "units_traded_24H", "acc_trade_value_24H"]:
coin_df = coin_df.withColumn(field, coin_df["Info"][field])
# 불필요한 컬럼 제거
coin_df = coin_df.drop("Info", "fluctuate_24H", "fluctuate_rate_24H")
# 현재 시간과 각각의 시간 정보를 추출하여 컬럼으로 추가
coin_df = coin_df.withColumn("now_time", current_timestamp())
coin_df = coin_df.withColumn("year", year("now_time"))
coin_df = coin_df.withColumn("month", month("now_time"))
coin_df = coin_df.withColumn("day", dayofmonth("now_time"))
coin_df = coin_df.withColumn("hour", hour("now_time"))
coin_df = coin_df.withColumn("minute", minute("now_time"))
# 이전 데이터프레임 읽어오기
previous_df = spark.read.option("header", "true").csv("coin_data.csv/*.csv")
# 두 데이터프레임을 수직으로 합치기
merged_df = previous_df.union(coin_df)
# 새로운 디렉토리 저장
merged_df.coalesce(1).write.option("header", "true").option("ignoreSuccessFile", "true").csv("coin_data_temp.csv")
# 기존 디렉토리 삭제
os.system("-rm -r coin_data.csv")
# 새로운 디렉터리 이름 변경
os.system("-mv coin_data_temp.csv coin_data.csv")
print("union출력")
# 스파크 세션 종료
spark.stop()
20. 스파크와 몽고db를 연결하여 local에 있는 csv 파일을 다른 서버에 있는 몽고db에 적재한다.
from pyspark.sql import SparkSession
import pymongo
# 스파크 세션 생성
spark = SparkSession.builder \
.master("local") \
.appName("CSV to MongoDB") \
.getOrCreate()
# HDFS에서 CSV 파일 읽어오기
csv_path = "coin_data.csv/*.csv"
csv_df = spark.read.option("header", "true").csv(csv_path)
# MongoDB에 연결
client = pymongo.MongoClient("mongodb://10.0.9.85:27017/")
db = client["coin"]
collection = db["coin_data"]
# 데이터프레임을 MongoDB에 저장
csv_df.write.format("mongo").mode("overwrite").option("uri", "mongodb://10.0.9.85:27017/coin.coin_data").save()
print("성공")
# 스파크 세션 종료
spark.stop()
~