海量数据挖掘之中联通流量营运系统
优采云 发布时间: 2020-08-21 05:32海量数据挖掘之中联通流量营运系统
从联通运营商的核心网段中把须要的数据发送到ftp服务器上,然后我们那边都会提供ftp的客户端去采集ftp服务器的数据,然后处理以后过来进行剖析。
三、内容辨识模块
把url经过爬虫之后到辨识系统,分析出网站名,主题,类别,(作者)等
将分类体系导出到数据库中,url json封装的内容信息。
大量的日志不断的形成,然后通过行为轨迹提高,通过一个mapreduce,
如果这个数据匹配到了,则将原创行+内容剖析结果信息(从知识库来的数据)导出到提高日志。如果匹配不到的数据就输出到一个待爬清单中。
识别系统:自然语言处理SVM(实时辨识),人工辨识(人工一条条的去辨识),模板辨识(一个网页的内容的位置通常不会变,用xpath来定位到我们所须要查找的节点)
相信学过xml的应当就会使用xpath了,如果不会的话,可以查阅我这篇文章:
在这个项目中庸xpath来做这个模板匹配:例如
对于一个网页的html页面来说,我们可以这样来匹配其标题,例如我们打开搜狐的html,我们可以看见他的这个网页标题是的,所以我们对于这类网站就可以用xpath来定位这个title在那里,然后去获取这title节点中内容
souhu.com
movie_name
/path/.../
当然了,使用xpath和xml去做这些模板匹配有一定的局限性,适用于一个结构十分清晰的网页,例如视频、小说、音乐等,对于那个奇奇怪怪的网友就不适用了。
所以总的来说,对于内容辨识要采用多种方法去做,不要局限于一种,不同类型的网站最好有不同的解决方案。
我们使用自然语言处理来进行剖析的时侯还有问题就是,一个网页的内容太多了,svm剖析有时候不能完全的辨识到我们想要的内容,就像一条新闻,本来这个新闻的主标题才是中国网页的主要内容,而使用自然语言处理系统的话它可能会把新闻下边的广告读成了这个网页的主要内容了,所以这样的话都会有偏差了。当然咯,自然语言剖析还是很有用,那么为了更精确的辨识照料好网页的内容如何办呢。好吧,那当然是最传统的人工读取了,由帅气的实习生们把这种网页一条条的浏览,然后记录这个网页的主要内容!(好吧,不要震惊
,移动就是那么干的)。然后读取大约10万个网页,这样的话就产生了一个规则库,这个就比自然语言处理和模板匹配的结果愈发精确了。
四、知识库url选购
两个知识库,一个规则库(人工剖析的),还有一个实例库(自动剖析系统)。
先把url进行规则剖析,如果有则输出,没有没有则放在实例库,如果实例库也没有了,就放在待爬清单中。
先拿1T的样本数据,然后网址按流量汇总排序下来,总流量的前80%,总条数10万条。
因为只要选购下来就可以了,不需要实时在运行的,所以只要一个job就可以了。
这里我们主要领到一个url和总流量来进行剖析和处理,其他更为复杂的情况这儿就不分享了哦。
我们首先可以在eclipse总新建一个jav工程,导入各类hadoop/lib下边的jar包,或者直接新建一个mapRedecer工程也可以。
新建一个bean类:记得要承继一个Comparable插口。
package cn.tf.kpi;
public class FlowBean implements Comparable{
private String url;
private long upflow;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public long getUpflow() {
return upflow;
}
public void setUpflow(long upflow) {
this.upflow = upflow;
}
public FlowBean(String url, long upflow) {
super();
this.url = url;
this.upflow = upflow;
}
public FlowBean() {
super();
}
@Override
public int compareTo(FlowBean o) {
return (int) (o.getUpflow() - this.upflow) ;
}
@Override
public String toString() {
return "FlowBean [url=" + url + ", upflow=" + upflow + "]";
}
}
然后写主方式:
其实这儿和我之前写的那种用户流量剖析系统有很多类似的地方。
<p>package cn.tf.kpi;
import java.io.IOException;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.tf.kpi.TopURL.TopURLMapper.TopURlReducer;
public class TopURL {
public static class TopURLMapper extends
Mapper {
private Text k = new Text();
private LongWritable v = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
try {
String url = fields[26];
long upFlow = Long.parseLong(fields[30]);
k.set(url);
v.set(upFlow);
context.write(k, v);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class TopURlReducer extends
Reducer {
private Text k = new Text();
private LongWritable v = new LongWritable();
TreeSet urls = new TreeSet();
//全局流量和
long globalFlowSum =0;
@Override
protected void reduce(Text key, Iterable values,
Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable v : values) {
count += v.get();
}
globalFlowSum +=count;
FlowBean bean = new FlowBean(key.toString(), count);
urls.add(bean);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
long tempSum=0;
for(FlowBean bean:urls){
//取前80%的
if(tempSum/globalFlowSum