태그 : Springbatch
2008/08/04 Spring batch 국내 자료링크, 그리고 KSUG 연재
2008/07/10 이미 35개 이상의 Accenture 고객사에 Spring batch가 적용되고 있다 [4]
Hadoop의 Map-Reduce처리에서는 DB를 바로 연결해서 처리할 수 있는 DBInputFormat, DBOutputFormat의 클래스가 제공되고 있습니다.
그러나 이 클래스들은 이름이 'DB'가 붙어있는 것이 무색하게 Oracle과 연결해서 사용해보면 에러가 납니다. DBInputFormat에서는 웹에서의 페이지 처리 쿼리처럼 데이터를 잘라서 가지고 오기 위해 원래 쿼리에다 LIMIT와 OFFSET 키워드를 붙이는데, 이 것은 Oracle에서는 지원되지 않습니다. 그리고 DBOutputFormat에서는 insert문의 맨 뒤에 세미콜론(;)을 붙여버리는데, 이것 역시 Oracle의 JDBC를 사용할 때는 에러를 냅니다.
따라서, 결국 이 클래스들을 Oracle에서 쓸 수 있도록 상속해서 구현을 해 줄수 밖에 없었습니다. 얼핏 생각하면 쿼리만 바꾸어주면 되는 것이니 메소드 하나만 오버라이딩 해주면 될 것으로 예상했으나, 원래 클래스들의 구조가 그 정도로 단순하지 않았습니다.Inner클래스가 많아서 여러 클래스와 메서드들을 다 overriding해 줄 수 밖에 없었습니다. 더군다나, 새로 상속한 클래스의 내부에서 꼭 호출해야 하는 DBConfiguration클래스의 생성자가 public이 아닌 package private(아무것도 선언안한 디폴트 접근자)인 탓에, 패키지를 원래의 DBInputFormat, DBOutputFormat와 같은 패키지로 맞추어야 하는 불편함도 있었습니다. protected로 선언된 메소드들이 많은 것보면 분명히 상속해서 덮어쓰라고 만들어놓은 클래스 같은데, 막상 그렇게 활용하기에는 간편하지 않았던 것이죠.
그리고 또 구조적으로 아쉬운 점은 두 클래스가 같은 DBConfiguration을 보게 있어서 Map에서 입력자료를 얻어오는 DB와 Reduce에서 쓰는 DB가 다를 때는 다시 별도의 클래스를 만들어주어야 한다는 것입니다.
Spring batch에서도 JdbcPagingItemReader라는 약간 유사한 클래스가 있습니다. DBInputFormat이 하나의 쿼리에서 가지고 올 데이터를 동시에 여러번 쿼리해서 나누어 가지고 오는 반면에 JdbcPagingItemReader에서는 부분씩 가지고 오더라도 순차적으로 쿼리를 하는 차이점이 있기는 합니다. 그래도, 페이지 처리 쿼리처럼, 데이터를 나누어서 가지고 오는 쿼리를 제공한다는 점에서는 유사합니다. JdbcPagingItemReader에서는 내부적으로 PagingQueryProvider 라는 인터페이스를 사용하게 되어 있고, 이 인터페이스는 각 DB종류별로 OraclePagingQueryProvider, HsqlPagingQueryProvider, MySqlPagingQueryProvider, , SqlServerPagingQueryProvider,SybasePagingQueryProvider 등의 구현클래스를 가지고 있습니다. Hadoop의 DBInputFormat도 이런 구조였다면 이를 응용하려는 개발자가 훨씬 쉽게 클래스 확장방법을 이해했을 것입니다.
아뭏든 지금까지 현재 공개된 API만으로는 Hadoop의 DB연결 지원 클래스들은 빈약해 보이고, API도 좋은 설계요건을 갖추었다고 느껴지지는 않습니다. 아무래도 포털 등에서 대용량 데이터를 처리하는 곳에 쓰이다보니 DB와 함께 연결되는 쓰임새가 그리 많지는 않았나봅니다. 더군다나 Oracle에서는 한번도 안 돌려본 클래스가 버젓이 DB...로 시작되는 이름으로 들어간 것 보면 Oracle이 쓰이는 동네와 Hadoop이 사는 곳은 아주 멀리 떨어져 있었던 것 같습니다. 그러나, 앞으로 엔터프라이즈 환경에서도 Hadoop이 쓰이려면 DB와의 integration은 반드시 거쳐야할 다리인 것 같습니다. Enterprise 시장에서의 mapreduce 링크를 보아도 이미 그런 시도들이 시작된 것을 알 수 있습니다.
한편, Hadoop의 FileInputFormat가 Spring batch의 FlatFileItemReader와 유사한 것 등이나 Spring batch도 2.0에서 아직 분산, 동시처리 등을 지원하기 시작했다는 점은 두 프레임웍의 겹치는 지점이 늘어날 수도 있다는 생각도 듭니다. 뭐 아직 Spring batch의 분산지원은 걸음마 단계이기는 합니다만, DB에서 HDFS에 들어가는 파일을 쓸 때 Spring batch의 API를 활용하는 것 깉은 활용법은 시도해 볼만하다고 생각됩니다.
package org.apache.hadoop.mapred.lib.db;
import java.io.IOException;
import java.sql.SQLException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class OracleInputFormat<T extends DBWritable> extends DBInputFormat<T>{
private DBConfiguration dbConf = null;
private DBInputSplit split;
@Override
public RecordReader<LongWritable,T> getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
dbConf = new DBConfiguration(job);
this.split = (DBInputSplit)split;
@SuppressWarnings("unchecked")
Class inputClass = dbConf.getInputClass();
try {
@SuppressWarnings("unchecked")
RecordReader<LongWritable,T> reader = new OracleRecordReader((DBInputSplit) split, inputClass, job);
return reader;
} catch (SQLException ex) {
throw new IOException(ex.getMessage());
}
}
public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
String inputQuery, String inputCountQuery) {
DBInputFormat.setInput(job, inputClass, inputQuery, inputCountQuery);
job.setInputFormat(OracleInputFormat.class);
}
protected class OracleRecordReader extends DBRecordReader{
protected OracleRecordReader(DBInputSplit split, Class<T> inputClass,
JobConf conf) throws SQLException {
super(split, inputClass, conf);
}
@Override
protected String getSelectQuery() {
long length = 0;
long start = 0;
try{
length = split.getLength();
start = split.getStart();
} catch(IOException e){
throw new IllegalArgumentException
("cannot read length or start variable from DBInputSplit",e);
}
StringBuilder query = new StringBuilder();
query.append(" SELECT * \r\n");
query.append(" FROM (SELECT m.* , ROWNUM rno ");
query.append(" FROM ( ");
query.append( dbConf.getInputQuery());
query.append(" ) m");
query.append(" WHERE ROWNUM <= " + start + " + " + length + ")");
query.append(" WHERE RNO > " + start);
System.out.println(query.toString());
return query.toString();
}
}
}
package org.apache.hadoop.mapred.lib.db;
import org.apache.hadoop.mapred.JobConf;
public class OracleOutputFormat<K extends DBWritable, V> extends DBOutputFormat<DBWritable, V>{
@Override
protected String constructQuery(String table, String[] fieldNames) {
if(fieldNames == null) {
throw new IllegalArgumentException("Field names may not be null");
}
StringBuilder query = new StringBuilder();
query.append("INSERT INTO ").append(table);
if (fieldNames.length > 0 && fieldNames[0] != null) {
query.append(" (");
for (int i = 0; i < fieldNames.length; i++) {
query.append(fieldNames[i]);
if (i != fieldNames.length - 1) {
query.append(",");
}
}
query.append(")");
}
query.append(" VALUES (");
for (int i = 0; i < fieldNames.length; i++) {
query.append("?");
if(i != fieldNames.length - 1) {
query.append(",");
}
}
query.append(")");
return query.toString();
}
public static void setOutput(JobConf job, String tableName, String... fieldNames) {
DBOutputFormat.setOutput(job, tableName, fieldNames);
job.setOutputFormat(OracleOutputFormat.class);
}
}
public class SampleJob {
public static void main(String args[]) throws IOException, URISyntaxException{
JobConf conf = new JobConf(SampleJob.class);
initClasspath(conf);
conf.setJobName("sampleJob");
DBConfiguration.configureDB(conf, "oracle.jdbc.driver.OracleDriver",
"jdbc:oracle:thin:@localhost:1525:TEST",
"myuser", "mypassword");
OracleInputFormat.setInput(conf, Query.class,
"SELECT query, category, user_id FROM query_log ",
"SELECT COUNT(*) FROM query_log");
conf.setOutputKeyClass(Query.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(SampleMapper.class);
conf.setReducerClass(SampleReducer.class);
conf.setCombinerClass(SampleReducer.class);
OracleOutputFormat.setOutput(conf, "category", "user_id","cnt");
JobClient.runJob(conf);
}
private static void initClasspath(JobConf conf) throws URISyntaxException,
IOException {
DistributedCache.addCacheFile(new URI("lib/ojdbc5-11.1.0.6.jar"), conf);
DistributedCache.addFileToClassPath(new Path("lib/ojdbc5-11.1.0.6.jar"), conf);
}
}
http://developer.yahoo.net/blogs/hadoop/DBInputFormat.ppt
http://www.cloudera.com/blog/wp-content/uploads/DBInputFormat.pdf
http://www.cloudera.com/blog/2009/03/06/database-access-with-hadoop/
# by | 2009/03/25 08:03 | 기술 자료 | 트랙백(6) | 덧글(0)
(수정이력: 2008.01.19 - 스프링배치 관련 링크들은 http://www.ksug.org/94로 옮기고, 최근 자료를 포함시켰습니다.)
7월31일에 Springbatch 1.1.1이 나왔군요. 버그수정 외에 특별한 기능의 update 사안은 없기는 합니다만, 진도가 빠른 느낌입니다. ( http://www.springframework.org/node/722 )
Spring batch에 대한 국내 자료도 점점 많이 생겨나고 있습니다. KSUG(한국 스프링사용자모임)에서 만난 분들 중 Spring batch에 관심을 가지고 계신 분들이 몇 분 계셔서, 새로운 자료들도 속속 올라올 것 같군요. 그런 분들과 정보를 주고 받을 수 있으니, 포럼(http://forum.ksug.org) 개설 등 최근 KSUG의 변화에 따른 수혜를 제가 많이 받고 있다고 느껴집니다.
스프링배치 국내 자료 모음 - http://www.ksug.org/94
그리고 제가 마이크로소프트웨어지에 연재하고 있는 글도 시일이 좀 지난 기사에 한해서 담당기자분의 허락을 얻어 KSUG의 블로그에 올리고 있습니다.
이미 잡지에 나왔던 내용이지만, 인쇄되어 나올 원고를 쓸 때와는 달리 링크와 인용을 더 편하게 할 수 있기에 웹사이트에 올릴 수 있는 기회가 온 것을 반가워하고 있습니다. 연재용 원고를 쓸 때는 참조자료의 URL이 들어가면 긴 주소를 독자들이 직접 쳐서 웹사이트에 들어가 볼까 하는 의구심도 들고, 주소가 차지하는 지면 공간도 많고 해서 사소한 것까지 웹사이트 주소를 쓰기가 망설여 지더군요. 웹에 올릴 때는 API문서 등 사소한 링크도 다 넣을 수 있어서 속이 후련했습니다. 그리고 티스토리에서 제공되는 '미주'를 다는 기능도 정말 마음에 듭니다. 개인블로그도 이글루스에서 이사가고 싶은 마음이 생길 정도입니다;
위의 연재 내용을 포함한 스프링에 대한 모든 질문과 토론은 http://forum.ksug.org을 통해서 하시면 됩니다~!
# by | 2008/08/04 00:14 | 기술 자료 | 트랙백 | 덧글(0)
Spring batch 프레임웍은 스프링 프레임웍의 본부인 SpringSource와 컨설팅 회사 Accenture가 협업해서 개발하고 있습니다. SpringSource의 기술력과 Accenture의 현장경험이 합쳐진 것이죠. 작년 5월에 국내에도 이 소식이 보도된 기사가 있군요.
Spring batch 포럼에서 보면 이미 현장에서 적용하고 있는 사람들이 많아 보이기는 해도, 아직 얼마나 쓰이고 있는지 짐작하기는 어려웠습니다. 그런데, 얼마전에 Accenture에서 내놓은 기사에 따르면 이미 35개 이상의 고객사에서 활용되고 있다고 합니다.
지난 5월에 Spring batch에 대해서 공개 세미나에서 발표를 한 적이 있었습니다. 그 때 받은 질문 중에 하나가 '금융권 등에서는 민감하고 고성능이 필요한 프로그램들이 배치로 돌고 있고, DB의 Stored procedure로 되어 있는 것이 많다. 그런 것들이 다 SpringBatch로 가능하겠느냐'는 것이였습니다. 그 때 저는 '성능에 대해서는 구체적으로 나온 자료가 없다. DB를 많이 쓰는 작업이라면 DB내부에서 돌아가는 Stored Procedure가 더 빠를 가능성이 크다. 그러나 배치 어플리케이션의 프로그래밍에서 추구할 우선적인 가치는 업무의 특성에 따라 다를 것이다. 그것이 성능일 때도 있고, 생산성이나 유지보수성일 때도 있다. 빠른 성능이 절대적으로 중요한 프로그램이 Spring batch의 모듈을 썼을 때 그 기대치가 안 나온다면 Store procedure로 주요부분을 짜고 그것을 Spring batch 내부에서 호출하는 방법도 가능하다.'라고 대답을 했었습니다.
그 때 위의 기사의 사례들을 알았다면 '이미 Accenture의 많은 고객사에서 사용하고 있는 것으로 보아 기업환경의 실무에서 충분히 사용 가능한 수준이라고 말할 수 있다.'라는 말을 덧붙일수 있었을 건데 아쉽군요.
우리나라의 SI에서도 적용될만한 곳이 분명 있을 것입니다.
◀ 이전 페이지 다음 페이지 ▶