https://medium.com/@sandeepsinh/sqoop-jobs-development-using-python-82e4b0139457

 

Sqoop job development using Python

Sqoop job development using Python

medium.com

기존의 경로에 파일을 있다면 에러가 나기에 파일을 삭제하고 다시 만들어야 한다.

--delete-target-dir --target-dir 'user/hadoop/input

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import subprocess
import logging
 
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
   # Function to run Hadoop command
def run_unix_cmd(args_list):
"""
    run linux commands
    """
    print('Running system command:{0}'.format('     '.join(args_list)))
    s_output, s_err = proc.communicate()
    s_return = proc.returncode
    return s_return, s_output, s_err
 
  # Create Sqoop Job
def sqoop_job():
   """
   Create Sqoop job
   """
   cmd = ['sqoop''import''--connect''jdbc:mysql://192.168.0.35:3309/member?serverTimezone=Asia/Seoul''--username''hadoop','--password''markate123''--tab    le''student''-m''1''--delete-target-dir','--target-dir''/user/hadoop/input']
   print(cmd)
   (ret, out, err) = run_unix_cmd(cmd)
   print(ret, out, err)
   if ret == 0:
      logging.info('Success.')
   else:
      logging.info('Error.')
 
if __name__ == '__main__':
    sqoop_job()
 
 

 

 

'Centos7 > Sqoop' 카테고리의 다른 글

스쿱-MySQL 연동 2. 테스트  (0) 2019.05.09
스쿱-MySQL 연동 1. Sqoop & MySQL Connector 설치 및 설정  (0) 2019.05.09

MySQL 특징

  • Master MySQL은 Slave MySQL에 대한 정보를 전혀 가지고 있지 않다. 이는 slave가 있는지 없는지, 몇개의 slave가 붙어 있는지, 각 slave가 어디까지 data를 복제했는지 모른다는 거다.
  • Master MySQL에서 Data 또는 스키마에 대한 변경 Event가 발생하면 Binary logs에 기록한다. 다시 말하자면, Data 또는 스키마를 변경하려는 Event를 기록한다는 것이다.  이는 Binary logs가 존재하는 한 데이터를 복구 할 수 있다는 뜻이다. 물론 복제 할 때에도 사용된다.
  • Slave MySQL은 어떤 Event까지 저장되어 있는지 기억하고 있다.

허면 어떻게 MySQL Replication가 이루어 지는지 알아볼 필요가 있다.

 

1. Slave는 I/O thread를 통해 Master에게 Event를 요청한다.(slave는 어떤 Event까지 저장되어있는지 알고 있기 때문에 Master에게 그 이후의 Event를 요청)

 

2. Slave가 Event를 요청하면 Master에서는 Binlog dump thread를 통해 Slave에 Event를 보낸다.

Binlog dump thread : Master에 저장된 Binary logs를 Slave에 전달하기 위해 Master에서는 thread를 만드는데 이를 Binlog dump thread라고 한다. Binlog dump thread는 Slave에서 Event를 요청하면 Binary logs에 락을 걸고 안에 기록된 Event 읽고, 락을 풀고 Slave에 전달하는 기능을 가지고 있다.

 

3. Slave는 I/O thread를 통해 받은 Event를 Relay log에 기록한다.

 

4. Slave는 SQL thread를 통해 Event가 기록된 Relay log를 읽고 자신의 데이터를 변경 한다. 그리고 Relay log를 지운다.

'Centos7 > MySQL' 카테고리의 다른 글

GROUP BY 그룹별-월별 최신날짜의 데이터 가져오기  (0) 2020.06.29
MySQL Replication  (0) 2019.05.27

파일명 : [example.py]

설정한 자바 홈을 출력하는 쉘 스크립트 명령어를 파이썬 스크립트로 작성

1
2
3
4
5
#!/usr/bin/env python
 
import subprocess
 
subprocess.call("echo $JAVA_HOME",shell=True)
 

파일 실행

1
2
[hadoop@master ~]$ python example.py
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.212.b04-0.el7_6.x86_64
 

'Centos7 > Python' 카테고리의 다른 글

Python-MySQL 연결하기  (0) 2019.05.10
1
[root@localhost ~]# mysql -u [mysql사용자명] -p [접속하고자 하는 DB이름] -e "mysql query"
 

example)

1
2
3
4
5
6
7
8
9
[hadoop@master ~]$ mysql -u hadoop -p member -"select * from student"
Enter password:
+----+-------+---------------+
| id | name  | phone         |
+----+-------+---------------+
|  1 | smith | 010-1111-1111 |
|  2 | kelly | 010-2222-2222 |
+----+-------+---------------+
[hadoop@master ~]$
 

-u : user의 약자

-p : password의 약자

-e : exxcute의 약자


 

'Centos7' 카테고리의 다른 글

작업 명령어 예약 하는 방법  (0) 2019.05.16

Centos7 환경에서 Python으로 MySQL하고 연동 하고자 한다면 

https://mygameprogamming.tistory.com/41?category=710446

 

Python-MySQL 연결하기

Python-MySQL 연결하기 위해선 당연하게도 Python과 MySQL이 있어야 한다. MySQL version : 5.6.44 Flask version : 1.0.2 Python version 3.6.7 1. Python을 설치하기 위해서는 pip가 선행 설치 되어있어야 한다...

mygameprogamming.tistory.com

Python에서 json 문법을 따른 string은 json문법의 'key'를 통해 'value'에 접근 할 수 있습니다.

 

fetchall() return값의 data type은 list이고,

fetchone() return 값의 data type은 tuple이다.

 

키 값을 통해 접근하려면 data type을 string으로 바꿔 주어야 한다.

 

json.loads()를 사용하는데 괄호 안에는 string 이 들어가야한다.

 

list와 tuple은 들어 갈 수 없으므로 row[0]으로 하면 row라는 행의 0번째 데이터를 뜻하고 data type은 string으로 변한다.

그러므로 다음과 같이 JSON의 키 값으로 통해 접근 할 수 있다. 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from flask import Flask, request, Response, jsonify, render_template
 
app = Flask(__name__)
 
import mysql.connector
import json
 
# MySQL connect
db = mysql.connector.connect(
        host="접속하려는 MySQL이 설치된 IP주소"
        user="MySQL 사용자",
        passwd="MySQL 사용자 비밀번호"
        db='접속하려는 DB명', port=3309
        )
 
# MySQL curs
curs = db.cursor()
 
# Flask framework = route
def index():
        return render_template('index.html')
 
@app.route('/request', methods =['post'])
def check_flask():
        # SQL
        value = request.form['SensorID']
        sql = "select esnData from ESensorTable where esnSensorId = '"+value+"'"
        curs.execute(sql)
        #DATA Fetch
        row = curs.fetchone()
        #convert json to python
        data = json.loads(row[0])
        CO2 = data['CO2']
        CO = data['CO']
        O2 = data['O2']
        H2S = data['H2S']
        VOC = data['VOC']
        Temp = data['Temperature']
        Hum = data['Humidity']
        print(data)
 
        return VOC
 
        db.commit()
 
        curs.close()
 
if __name__=='__main__':
        app.run(host='0.0.0.0',port=5000)
 
db.close()
cs

 

아래와 같이

xxx.xxx.xxx.101

xxx.xxx.xxx.102

xxx.xxx.xxx.103

datanode에 분산 저장한것을 확인 할 수 있습니다.

 

명령어는

hdfs fsck [hdfs상의 파일 경로] -file -blocaks -locations 

 

DatanodeInfoWithStorage[xxx.xxx.xxx.101:50010,DS-df8b44f2-f13b-4379-97e0-5220191f7a8a,DISK] 에서

xxx.xxx.xxx.101 = ip주소

50010 = 포트번호

DS-df8b44f2-f13b-4379-97e0-5220191f7a8a = 저장소 ID. 데이터 노드의 내부 ID입니다. 데이터 노드가 이름 노드에 등록 할 때 할당됩니다.

DISK = storageType. 디스크. 저장소 유형은 RAM_DISK, SSD, DISK 및 ARCHIVE가 될 수 있습니다.

 

참고:

https://stackoverflow.com/questions/34497060/hdfs-fsck-command-output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[hadoop@master current]$ hdfs fsck input/test.txt -files -blocks -locations
Connecting to namenode via http://master:50070/fsck?ugi=hadoop&files=1&blocks=1&locations=1&path=%2Fuser%2Fhadoop%2Finput%2Ftest.txt
FSCK started by hadoop (auth:SIMPLE) from /192.168.0.35 for path /user/hadoop/input/test.txt at Wed May 15 09:32:40 KST 2019
/user/hadoop/input/test.txt 76 bytes, 1 block(s):  OK
0. BP-306908151-192.168.0.35-1557215026538:blk_1073742641_1819 len=76 repl=3 [DatanodeInfoWithStorage[xxx.xxx.xxx.102:50010,DS-7417b9d5-2b9a-4ef9-b8eb-afd2ee982a54,DISK], 
DatanodeInfoWithStorage[xxx.xxx.xxx.101:50010,DS-df8b44f2-f13b-4379-97e0-5220191f7a8a,DISK], 
DatanodeInfoWithStorage[xxx.xxx.xxx.103:50010,DS-b7ddf346-b5cd-44b7-8c31-dc051c4a7943,DISK]]
 
Status: HEALTHY
 Total size:    76 B
 Total dirs:    0
 Total files:   1
 Total symlinks:                0
 Total blocks (validated):      1 (avg. block size 76 B)
 Minimally replicated blocks:   1 (100.0 %)
 Over-replicated blocks:        0 (0.0 %)
 Under-replicated blocks:       0 (0.0 %)
 Mis-replicated blocks:         0 (0.0 %)
 Default replication factor:    3
 Average block replication:     3.0
 Corrupt blocks:                0
 Missing replicas:              0 (0.0 %)
 Number of data-nodes:          3
 Number of racks:               1
FSCK ended at Wed May 15 09:32:40 KST 2019 in 11 milliseconds
 
 
The filesystem under path '/user/hadoop/input/test.txt' is HEALTHY
h

+ Recent posts