数据采集的几种方式

优采云 发布时间: 2020-08-25 18:30

  数据采集的几种方式

  

  业务系统

  数据采集的几种形式方法一:读备份库

  为了不影响业务系统的正常运行,可以采用读备份库的数据,这样就能及时获取数据进行一些剖析工作,但是有些从业务也会读取备份数据库,还须要考虑一致性和可用性问题。

  

  数据库备份

  方式一: 埋点(pingback)

  可以在后端APP上记录用户点击,滑动速率,停留时间,进入的时间段,最后看的新闻等等信息,这些可以通过网路传输将埋点信息记录出来,用于数据剖析。但是这些方法有可能会对业务系统代码具有一定的侵入性,同时工作量也比较大,存在一定的安全隐患。

  

  埋点

  后端采集数据的service

  /**

* 埋点接收数据

* @param pingBack

* @return

*/

@RequestMapping(path = "/insert", method = RequestMethod.POST)

@ResponseBody

public ApiResponse insert(@RequestBody PingBack pingBack) {

Boolean result = patientService.savePingBack(pingBack);

return new ApiResponse().success(result);

}

  已有的业务系统可以给数据采集系统发送数据

  /**

* pingback方式插入

* @param patient

* @return

*/

@RequestMapping(path = "/insert", method = RequestMethod.POST)

@ResponseBody

public ApiResponse insert(@RequestBody Patient patient) {

try{

Boolean result = patientService.savePatient(patient);

return new ApiResponse().success(result);

}catch (InternalError error){

log.error("insert error");

}finally {

pingBackService.jsonRequest(url+"insert", patient);

}

return null;

}

  方式三: 发送消息的方法

  上述埋点的形式在业务系统忙碌的情况下,会对数据采集系统形成大量的恳求,如果数据处理不及时会把数据采集服务击败,同时为了前馈,这里可以引入消息中间件,如果对时效性要求较高,可以采用推模式对数据采集系统进行推送,如果时效性不是很高,可以采用定时任务拉取数据,再进行剖析。

  同时可以多个系统订阅消息中间件中不同Topic的数据,可以对数据进行重用,后端多个数据剖析系统之间互不影响,减轻了从业务系统采集多份数据的压力。

  

  引入消息中间件

  数据采集Service

  /**

* 消息中间件的方式更新

* @param patient

* @return

*/

@RequestMapping(path = "/update", method = RequestMethod.POST)

@ResponseBody

public ApiResponse update(@RequestBody Patient patient) {

try{

Boolean result = patientService.updatePatient(patient);

return new ApiResponse().success(result);

}catch (InternalError error){

log.error("update error");

}finally {

sendMessageService.send(patient);

}

return null;

}

  中间件发送数据实现(以kafka为例)

  @Service

@Slf4j

public class SendMessageService {

@Autowired

private KafkaTemplate kafkaTemplate;

@Value("topic")

private String topic;

private ObjectMapper om = new ObjectMapper();

public boolean send(Object object){

String objectJson = "";

try {

objectJson = new ObjectMapper().writeValueAsString(object);

} catch (Exception e) {

log.error("can't trans the {} object to json string!", object);

return false;

}

try{

String result = kafkaTemplate.send("mysql-kafka-patient", objectJson).get().toString();

if(result!=null){

return true;

}

}catch (Exception e){

return false;

}

return false;

}

}

  中间件拉取数据:

  @KafkaListener(id = "forward", topics = "mysql-kafka-patient")

public String forward(String data) {

log.info("mysql-kafka-patient "+data);

JSONObject jsonObject1 = JSONObject.parseObject(data);

Message message = (Message) JSONObject.toJavaObject(jsonObject1,Message.class);

messageService.updateMessage(message);

return data;

}

  方式四:读取MySQL中的binlog

  MySQL会把数据的变更(插入和更新)保存在binlog中,需要在my.ini中配置开启,因此采用kafka订阅binlog,会将DB中须要的数组抓取下来,保存在备份库中,进行数据剖析,工作量较小,安全稳定。

  name=mysql-b-source-pingBack

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=1

connection.url=jdbc:mysql://localhost:3306/test?user=root&password=root

# timestamp+incrementing 时间戳自增混合模式

mode=timestamp+incrementing

# 自增字段 id

timestamp.column.name=commenttime

incrementing.column.name=id

# 白名单表 pingBack

table.whitelist=pingBack

# topic前缀 mysql-kafka-

topic.prefix=mysql-kafka-

  具体使用可以参考:官方文档

  分析对比数据采集方式优点缺点

  埋点(pingback)

  很细致的将后端用户操作记录出来,能够感知到DB储存之外的用户信息,时效性高

  工作量大,可能对业务代码有侵入性;当业务量大的时侯,数据抓取服务也须要承载一定的压力,对数据不便捷统计和聚合

  主库写备库读

  及时感知备库中的信息 ,数据一致性强

  可能存在大量不需要进行剖析的数组,对业务性能有影响

  埋点+消息中间件

  有效的解决业务量大时对数据存取性能的要求,根据数据抓取服务的需求可以拉也可以推,解耦业务代码

  可能会遗失数据,降低了时效性

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线