본문 바로가기

카테고리 없음

로컬에서 pyspark 실행시키기

  1. 우분투 실행 환경에서 시작!!
  2. (1) 필요 라이브러리 설치
    sudo apt-get -y update 
    sudo apt-get -y upgrade 
    sudo apt-get -y dist-upgrade
    sudo apt-get install -y vim wget unzip ssh openssh-* net-tools
  3. (2) 자바 설치
    sudo apt-get install -y openjdk-8-jdk
    java -version >>> 자바 버전 확인

    sudo find / -name java-8-openjdk-amd64 2>/dev/null >>> 환경변수에 추가해줄 경로를 확인
    (find . -name [FILE] 2> /dev/null검색 중 에러 메시지 출력하지 않기 )
  4. sudo vim /etc/environment
  5. PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin:/usr/local/hadoop/bin:/usr/local/hadoop/sbin:/usr/local/spark/bin:/usr/local/spark/sbin:/usr/bin/python3"

    JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"
    HADOOP_HOME="/usr/local/hadoop"
    SPARK_HOME="/usr/local/spark"

  6. source /etc/environment
  7. 1. 하둡 설치
    sudo wget https://ftp.sakura.ad.jp/pub/apache/hadoop/common/hadoop-3.2.3/hadoop-3.2.3.tar.gz

    2. 압축 해제
    sudo tar -zxvf hadoop-3.2.3.tar.gz -C /usr/local

    3. 이름 변경(좀더 쉽게쉽게 하기위해)
    sudo mv /usr/local/hadoop-3.2.3 /usr/local/hadoop 

    #mv는 파일 위치 변경 또는 파일 명 변경 모두 가능한 명령
  8. source ~/.bashrc
  9. sudo echo 'export HADOOP_HOME=/usr/local/hadoop' >> ~/.bashrc sudo echo 'export 
  10. source ~/.bashrc
  11. sudo vim $HADOOP_HOME/etc/hadoop/core-site.xml 들어가서 아래 코드 입력
<configuration>
        <property>
                        # HDFS의 기본 파일 시스템 이름을 지정합니다. 여기서는 
                # hdfs://nn1:9000으로 설정되어 있으므로 
                # 클라이언트가 HDFS에 액세스할 때 
                # 기본적으로 nn1 노드의 9000번 포트를 사용
                <name>fs.default.name</name>
                # 로컬에 단일노드로 할 예정으로 ///으로 설정
                <value>hdfs:///</value>
        </property>

</configuration>

11. hadoop-env.sh 파일 편집 [ Hadoop 실행 환경을 구성하는 데 사용되는 스크립트 ]

   - sudo vim $HADOOP_HOME/etc/hadoop/hadoop-env.sh

 

> / export 로 검색 후 해당 줄 주석 지우고 변경 
i 누르면 인서트 가능 :wq으로 저장
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export HADOOP_HOME=/usr/local/hadoop

12. 1.스파크 설치 sudo wget https://ftp.unicamp.br/pub/apache/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz

      2. 압축 해제 sudo tar -xzvf spark-3.2.4-bin-hadoop3.2.tgz -C /usr/local

      3. 이름 변경 sudo mv /usr/local/spark-3.2.4-bin-hadoop3.2 /usr/local/spark

 

13. source 명령을 통해 지역 변수로 올린다. source /etc/environment

 

14. spark-env.sh 파일 편집 [ Spark 및 Hadoop을 실행 및 구성하는 데 사용될 환경 변수 ]

cd $SPARK_HOME/conf

sudo cp spark-env.sh.template spark-env.sh

sudo vim spark-env.sh


export SPARK_HOME=/usr/local/spark
export SPARK_CONF_DIR=/usr/local/spark/conf
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SPARK_MASTER_WEBUI_PORT=18080

15.  sudo mkdir -p /usr/local/spark/logs && sudo chown -R $USER:$USER /usr/local/spark/

 

16. (5) 파이썬, 파이스파크 설치

  • sudo apt-get install -y python3-pip
  • sudo pip3 install pyspark findspark
  • source ~/.bashrc
  • sudo echo 'export PYTHONPATH=/usr/bin/python3' >> ~/.bashrc
  • sudo echo 'export PYSPARK_PYTHON=/usr/bin/python3' >> ~/.bashrc
  • source ~/.bashrc

17. 데이터 수집 py파일을 만들어준다 vim first_coin.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, year, month, dayofmonth, hour, minute
import requests

# 스파크 세션 생성
spark = SparkSession.builder \
    .master("local") \
    .appName("Bithumb API to CSV") \
    .getOrCreate()

# API URL
api_url = "https://api.bithumb.com/public/ticker/ALL_"

# API 데이터 요청 및 JSON 데이터 추출
response = requests.get(api_url)
data = response.json()

# data 필드에서 코인 정보 추출
coin_data = data.get("data")

# "date" 필드 제외
coin_data.pop("date", None)

# 코인 정보를 RDD로 변환
coin_rdd = spark.sparkContext.parallelize(coin_data.items())

# RDD를 DataFrame으로 변환
coin_df = coin_rdd.toDF(["Coin", "Info"])

# Info 필드를 JSON 형태의 컬럼들로 분리
for field in ["opening_price", "closing_price", "min_price", "max_price", "units_traded",
              "acc_trade_value", "prev_closing_price", "units_traded_24H", "acc_trade_value_24H"]:
    coin_df = coin_df.withColumn(field, coin_df["Info"][field])

# 불필요한 컬럼 제거
coin_df = coin_df.drop("Info", "fluctuate_24H", "fluctuate_rate_24H")

# 현재 시간과 각각의 시간 정보를 추출하여 컬럼으로 추가
coin_df = coin_df.withColumn("now_time", current_timestamp())
coin_df = coin_df.withColumn("year", year("now_time"))
coin_df = coin_df.withColumn("month", month("now_time"))
coin_df = coin_df.withColumn("day", dayofmonth("now_time"))
coin_df = coin_df.withColumn("hour", hour("now_time"))
coin_df = coin_df.withColumn("minute", minute("now_time"))

# DataFrame을 CSV 파일로 저장
coin_df.coalesce(1).write.option("header", "true").option("ignoreSuccessFile", "true").csv("coin_data.csv")

print("CSV 파일이 생성되었습니다.")

18. 스파크 파일 실행 ->  spark-submit --master local first_coin.py

 

19. api 데이터를 가지고 올떄마다 기존의 csv파일과 합친다.

# 스파크 세션 생성
spark = SparkSession.builder \
    .master("local") \
    .appName("Bithumb API to CSV") \
    .getOrCreate()

# API URL
api_url = "https://api.bithumb.com/public/ticker/ALL_"

# API 데이터 요청 및 JSON 데이터 추출
response = requests.get(api_url)
data = response.json()

# data 필드에서 코인 정보 추출
coin_data = data.get("data")

# "date" 필드 제외
coin_data.pop("date", None)

# 코인 정보를 RDD로 변환
coin_rdd = spark.sparkContext.parallelize(coin_data.items())

# RDD를 DataFrame으로 변환
coin_df = coin_rdd.toDF(["Coin", "Info"])

# Info 필드를 JSON 형태의 컬럼들로 분리
for field in ["opening_price", "closing_price", "min_price", "max_price", "units_traded",
              "acc_trade_value", "prev_closing_price", "units_traded_24H", "acc_trade_value_24H"]:
    coin_df = coin_df.withColumn(field, coin_df["Info"][field])

# 불필요한 컬럼 제거
coin_df = coin_df.drop("Info", "fluctuate_24H", "fluctuate_rate_24H")

# 현재 시간과 각각의 시간 정보를 추출하여 컬럼으로 추가
coin_df = coin_df.withColumn("now_time", current_timestamp())
coin_df = coin_df.withColumn("year", year("now_time"))
coin_df = coin_df.withColumn("month", month("now_time"))
coin_df = coin_df.withColumn("day", dayofmonth("now_time"))
coin_df = coin_df.withColumn("hour", hour("now_time"))
coin_df = coin_df.withColumn("minute", minute("now_time"))

# 이전 데이터프레임 읽어오기
previous_df = spark.read.option("header", "true").csv("coin_data.csv/*.csv")

# 두 데이터프레임을 수직으로 합치기
merged_df = previous_df.union(coin_df)


# 새로운 디렉토리 저장
merged_df.coalesce(1).write.option("header", "true").option("ignoreSuccessFile", "true").csv("coin_data_temp.csv")

# 기존 디렉토리 삭제
os.system("-rm -r coin_data.csv")

# 새로운 디렉터리 이름 변경
os.system("-mv coin_data_temp.csv coin_data.csv")

print("union출력")

# 스파크 세션 종료
spark.stop()

20. 스파크와 몽고db를 연결하여 local에 있는 csv 파일을 다른 서버에 있는 몽고db에 적재한다.

from pyspark.sql import SparkSession
import pymongo

# 스파크 세션 생성
spark = SparkSession.builder \
    .master("local") \
    .appName("CSV to MongoDB") \
    .getOrCreate()

# HDFS에서 CSV 파일 읽어오기
csv_path = "coin_data.csv/*.csv"
csv_df = spark.read.option("header", "true").csv(csv_path)


# MongoDB에 연결
client = pymongo.MongoClient("mongodb://10.0.9.85:27017/")
db = client["coin"]
collection = db["coin_data"]

# 데이터프레임을 MongoDB에 저장
csv_df.write.format("mongo").mode("overwrite").option("uri", "mongodb://10.0.9.85:27017/coin.coin_data").save()


print("성공")

# 스파크 세션 종료
spark.stop()
~