综合项目1-07 唯一ID标识项目具体实现

标签: 01-实战项目

9. 唯一ID项目实现代码

9.1 分析:日志预处理最大连通子图

日志来源:web端埋点日志、app端日志、微信小程序日志

实现思路:

1、加载当日的所有来源数据

2、提取出每类数据中每一行的各种用户的标识(uid,imei,mac,androidid,uuid,imsi等)

3、根据提取出来的这些标识 ,生成图计算中的点Vertexies、边Edge集合

4、将点、边集合,构造成一张图Graph

5、调用Graph中的最大连通子图算法,找到结果图

6、再从结果图中,取出所有的点集合

 日志格式:

{
	"eventid": "appClickEvent",
	"event": {
		"screen_id": "344",
		"screen_name": "",
		"title": "",
		"element_id": "4"
	},
	"user": {
		"uid": "245498",
		"account": "",
		"email": "",
		"phoneNbr": "18248667380",
		"birthday": "",
		"isRegistered": "",
		"isLogin": "",
		"addr": "",
		"gender": "",
		"phone": {
			"imei": "2881993463620531",
			"mac": "2e-80-50-8e-39-a1-1e",
			"imsi": "8616932323350461",
			"osName": "macos",
			"osVer": "9.0",
			"androidId": "",
			"resolution": "1024*768",
			"deviceType": "360_V",
			"deviceId": "81Kau4",
			"uuid": "L3whyU7BgtLKEkvE"
		},
		"app": {
			"appid": "com.51doit.mall",
			"appVer": "2.0.1",
			"release_ch": "纽扣助手",
			"promotion_ch": "12"
		},
		"loc": {
			"areacode": 210921102,
			"longtitude": 121.56605311428365,
			"latitude": 41.91452099352481,
			"carrier": "ISP02",
			"netType": "WIFI",
			"cid_sn": "463485993989",
			"ip": "138.117.92.76"
		},
		"sessionId": "sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"
	},
	"timestamp": "1575548955000"
}

 需要提取:

user->uid

user->phone->imei\mac\imsi\androidid\deviceid\uuid

package pub.ryan.dw.pub.ryan.dw.idmp

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import pub.ryan.commons.util.SparkUtil

import scala.collection.immutable

//埋点日志id映射:电脑、手机、微信日志
//user->uid
//user->phone->imei/mac/imsi/androidId/deviceId/uuid
object LogDataidmp {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkUtil.getSparkSession()
    //导隐式转换
    import sparkSession.implicits._
    //获取三类数据
    val weblog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-11\\web")
    val applog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-11\\app")
    val wxapplog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-11\\wxapp")
    //提取每一类数据中每一行的标识字段

    val appids: RDD[Array[String]] = logds(applog)
    val weblogids: RDD[Array[String]] = logds(weblog)
    val wxapplogids: RDD[Array[String]] = logds(wxapplog)
    //将所有log拼接成一个
    val ids: RDD[Array[String]] = appids.union(weblogids).union(wxapplogids)
    //构造图中的点集合
    val vertices: RDD[(Long, String)] = ids.flatMap(arr => {
      for (ele <- arr) yield (ele.hashCode.toLong, ele)
    })
    //构造图中的边集合 各种组合
    val edges: RDD[Edge[String]] = ids.flatMap(arr => {
      //双层for对数组中所有标识进行两两组合
      for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
    })
      //将每个组合进行类wordcount计数,计算边出现的数次,并过滤边小于2的情况
      .map(edge => (edge, 1))
      .reduceByKey(_ + _)
      .filter(tp => tp._2 > 2)
      .map(tp => tp._1)

    //构造图,并调用最小路径算法
    val graph: Graph[String, String] = Graph(vertices, edges)
    //存储的就是RDD(id, id值)
    val res_vertex: VertexRDD[VertexId] = graph.connectedComponents().vertices

    //再利用图计算的值作为新的日志gid保存到项目目录下
    res_vertex.toDF("gid_hc", "gid").write.parquet("data\\idmp\\2020-01-11\\")
    sparkSession.close()
  }

  //提取标识,因为它们的格式一样,所以不用处理不同,如果日志不同时记得要自己来处理
  private def logds(applog: Dataset[String]): RDD[Array[String]] = {
    applog.rdd.map(line => {
      val jsonObject: JSONObject = JSON.parseObject(line)
      val userObj: JSONObject = jsonObject.getJSONObject("user")
      val uid: String = userObj.getString("uid")
      val phoneObj: JSONObject = userObj.getJSONObject("phone")
      val imei: String = phoneObj.getString("imei")
      val mac: String = phoneObj.getString("mac")
      val imsi: String = phoneObj.getString("imsi")
      val androidId: String = phoneObj.getString("androidId")
      val deviceId: String = phoneObj.getString("deviceId")
      val uuid: String = phoneObj.getString("uuid")

      Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
    })
  }
}

可通过测试类读取生成的结果:

import org.apache.spark.sql.SparkSession

//读取存入的parquet
object ReadParquet {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()

    val dataFrame = sparkSession.read.parquet("data/idmp/2020-01-11")
    dataFrame.show(100, false)
  }
}

 

 

9.2 合并历史字典:将今日字典与前一天进行比对合并

实现思路:
1、将上一日的idmp映射字典解析成点、边集合
2、将当日的点合并上一日的点集合,当日的边合并上一日的边集合
3、再用所有点集合、边集合来构造图,并利用算法求出最大连通子图
4、将结果与上一日的映射字典作对比,调用最大连通子图

核心代码段:

取今天数据:(同昨天一样)

    val sparkSession: SparkSession = SparkUtil.getSparkSession()
    //导隐式转换
    import sparkSession.implicits._
    //获取今天三类数据
    val weblog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-12\\web")
    val applog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-12\\app")
    val wxapplog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-12\\wxapp")
    //提取每一类数据中每一行的标识字段

    val appids: RDD[Array[String]] = logds(applog)
    val weblogids: RDD[Array[String]] = logds(weblog)
    val wxapplogids: RDD[Array[String]] = logds(wxapplog)
    //将所有log拼接成一个
    val ids: RDD[Array[String]] = appids.union(weblogids).union(wxapplogids)
    //构造图中的点集合
    val vertices: RDD[(Long, String)] = ids.flatMap(arr => {
      for (ele <- arr) yield (ele.hashCode.toLong, ele)
    })
    //构造图中的边集合 各种组合
    val edges: RDD[Edge[String]] = ids.flatMap(arr => {
      //双层for对数组中所有标识进行两两组合
      for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
    })
      //将每个组合进行类wordcount计数,计算边出现的数次,并过滤边小于2的情况
      .map(edge => (edge, 1))
      .reduceByKey(_ + _)
      .filter(tp => tp._2 > 2)
      .map(tp => tp._1)

合并:

1、将上一日的idmp映射字典解析成点、边集合

2、将当日的点合并上一日的点集合,当日的边合并上一日的边集合

    // 将上一日的idmp映射字典解析成点、边集合
    // 昨天输出的是parquet 加载昨天的字典
    val yestodayIdmp: DataFrame = sparkSession.read.parquet("data\\idmp\\2020-01-11\\")
    val yestodayIdmpVertices: RDD[(VertexId, String)] = yestodayIdmp.rdd.map({
      case Row(idFlag: VertexId, guid: VertexId) => (idFlag, "")
    })
    val yestodayEdges: RDD[Edge[String]] = yestodayIdmp.rdd.map(row => {
      val idFlag: VertexId = row.getAs[VertexId]("gid_hc")
      val gid: VertexId = row.getAs[VertexId]("gid")
      Edge(idFlag, gid, "")
    })

    //将当日的点合并上一日的点集合,当日的边合并上一日的边集合,再用所有点集合、边集合来构造图,并利用算法求出最大连通子图
    val graph: Graph[String, String] = Graph(vertices.union(yestodayIdmpVertices), edges.union(yestodayEdges))
    val res_vertex: VertexRDD[VertexId] = graph.connectedComponents().vertices

    // 将今天的图结果与上日的映射字典做对比调整guid
    // 1、将上日的idmp映射结果字典收集到driver端,并广播
    val idMap: collection.Map[VertexId, VertexId] = yestodayIdmp.rdd.map(row => {
      val idFlag: VertexId = row.getAs[VertexId]("gid_hc")
      val gid: VertexId = row.getAs[VertexId]("gid")
      (idFlag, gid)
    }).collectAsMap()
    val broadcastValue: Broadcast[collection.Map[VertexId, VertexId]] = sparkSession.sparkContext.broadcast(idMap)

    // 2、将今天的图计算结果按照guid分组
    val todayImplResult: RDD[(VertexId, VertexId)] = res_vertex.map(tp => (tp._2, tp._1))
      .groupByKey()
      // 将广播变量放在map循环外面,一个区取一次
      .mapPartitions(iter => {
        // 从广播变量中取出上日的idmp映射字典
        val idmpMap: collection.Map[VertexId, VertexId] = broadcastValue.value
        iter.map(tp => {
          //当日guid计算结果
          var todayGid: VertexId = tp._1
          // 这一组是的所有id标识
          val todayids: Iterable[VertexId] = tp._2
          //遍历当日id, 挨个去上日的idmp映射字典中查找
          var find = false
          for (elem <- todayids if !find) {
            val maybeGid: Option[VertexId] = idmpMap.get(elem)
            // 如果这个id在昨天的映射陪爸妈中找到了,那么就用昨天的guid替换掉今天这一组的guid
            if (maybeGid.isDefined) {
              // 将昨天的guid替换今天的guid,退出当前循环
              todayGid = maybeGid.get
              find = true
            }
          }
          //返回合并后的今日最新的guid及数据
          (todayGid, todayids)
        })
      }).flatMap(tp => {
      val ids: Iterable[VertexId] = tp._2
      val gid: VertexId = tp._1
      for (elem <- ids) yield (elem, gid)
    })

最后将今天的字典输出即可。

完整代码:

package pub.ryan.dw.pub.ryan.dw.idmp

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import pub.ryan.commons.util.SparkUtil

// 加入上一日的日志数据
class LogDataidmp2 {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkUtil.getSparkSession()
    //导隐式转换
    import sparkSession.implicits._
    //获取今天三类数据
    val weblog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-12\\web")
    val applog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-12\\app")
    val wxapplog: Dataset[String] = sparkSession.read.textFile("D:\\Project\\p01\\logs\\2020-01-12\\wxapp")
    //提取每一类数据中每一行的标识字段

    val appids: RDD[Array[String]] = logds(applog)
    val weblogids: RDD[Array[String]] = logds(weblog)
    val wxapplogids: RDD[Array[String]] = logds(wxapplog)
    //将所有log拼接成一个
    val ids: RDD[Array[String]] = appids.union(weblogids).union(wxapplogids)
    //构造图中的点集合
    val vertices: RDD[(Long, String)] = ids.flatMap(arr => {
      for (ele <- arr) yield (ele.hashCode.toLong, ele)
    })
    //构造图中的边集合 各种组合
    val edges: RDD[Edge[String]] = ids.flatMap(arr => {
      //双层for对数组中所有标识进行两两组合
      for (i <- 0 to arr.length - 2; j <- i + 1 to arr.length - 1) yield Edge(arr(i).hashCode.toLong, arr(j).hashCode.toLong, "")
    })
      //将每个组合进行类wordcount计数,计算边出现的数次,并过滤边小于2的情况
      .map(edge => (edge, 1))
      .reduceByKey(_ + _)
      .filter(tp => tp._2 > 2)
      .map(tp => tp._1)

    // 将上一日的idmp映射字典解析成点、边集合
    // 昨天输出的是parquet 加载昨天的字典
    val yestodayIdmp: DataFrame = sparkSession.read.parquet("data\\idmp\\2020-01-11\\")
    val yestodayIdmpVertices: RDD[(VertexId, String)] = yestodayIdmp.rdd.map({
      case Row(idFlag: VertexId, guid: VertexId) => (idFlag, "")
    })
    val yestodayEdges: RDD[Edge[String]] = yestodayIdmp.rdd.map(row => {
      val idFlag: VertexId = row.getAs[VertexId]("gid_hc")
      val gid: VertexId = row.getAs[VertexId]("gid")
      Edge(idFlag, gid, "")
    })

    //将当日的点合并上一日的点集合,当日的边合并上一日的边集合,再用所有点集合、边集合来构造图,并利用算法求出最大连通子图
    val graph: Graph[String, String] = Graph(vertices.union(yestodayIdmpVertices), edges.union(yestodayEdges))
    val res_vertex: VertexRDD[VertexId] = graph.connectedComponents().vertices

    // 将今天的图结果与上日的映射字典做对比调整guid
    // 1、将上日的idmp映射结果字典收集到driver端,并广播
    val idMap: collection.Map[VertexId, VertexId] = yestodayIdmp.rdd.map(row => {
      val idFlag: VertexId = row.getAs[VertexId]("gid_hc")
      val gid: VertexId = row.getAs[VertexId]("gid")
      (idFlag, gid)
    }).collectAsMap()
    val broadcastValue: Broadcast[collection.Map[VertexId, VertexId]] = sparkSession.sparkContext.broadcast(idMap)

    // 2、将今天的图计算结果按照guid分组
    val todayImplResult: RDD[(VertexId, VertexId)] = res_vertex.map(tp => (tp._2, tp._1))
      .groupByKey()
      // 将广播变量放在map循环外面,一个区取一次
      .mapPartitions(iter => {
        // 从广播变量中取出上日的idmp映射字典
        val idmpMap: collection.Map[VertexId, VertexId] = broadcastValue.value
        iter.map(tp => {
          //当日guid计算结果
          var todayGid: VertexId = tp._1
          // 这一组是的所有id标识
          val todayids: Iterable[VertexId] = tp._2
          //遍历当日id, 挨个去上日的idmp映射字典中查找
          var find = false
          for (elem <- todayids if !find) {
            val maybeGid: Option[VertexId] = idmpMap.get(elem)
            // 如果这个id在昨天的映射陪爸妈中找到了,那么就用昨天的guid替换掉今天这一组的guid
            if (maybeGid.isDefined) {
              // 将昨天的guid替换今天的guid,退出当前循环
              todayGid = maybeGid.get
              find = true
            }
          }
          //返回合并后的今日最新的guid及数据
          (todayGid, todayids)
        })
      }).flatMap(tp => {
      val ids: Iterable[VertexId] = tp._2
      val gid: VertexId = tp._1
      for (elem <- ids) yield (elem, gid)
    })

    //再利用图计算的值作为新的日志gid保存到项目目录下 最后将字典写到今天目录下
    todayImplResult.toDF("gid_hc", "gid").write.parquet("data\\idmp\\2020-01-12\\")
    sparkSession.close()
  }

  //提取标识,因为它们的格式一样,所以不用处理不同,如果日志不同时记得要自己来处理
  private def logds(applog: Dataset[String]): RDD[Array[String]] = {
    applog.rdd.map(line => {
      val jsonObject: JSONObject = JSON.parseObject(line)
      val userObj: JSONObject = jsonObject.getJSONObject("user")
      val uid: String = userObj.getString("uid")
      val phoneObj: JSONObject = userObj.getJSONObject("phone")
      val imei: String = phoneObj.getString("imei")
      val mac: String = phoneObj.getString("mac")
      val imsi: String = phoneObj.getString("imsi")
      val androidId: String = phoneObj.getString("androidId")
      val deviceId: String = phoneObj.getString("deviceId")
      val uuid: String = phoneObj.getString("uuid")

      Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
    })
  }
}

其它优化:考虑到hashcode会冲突,可以把它换为md5或其它方式加密gid后就不会冲突了

 

Gitee位置

https://gitee.com/bigdataimplicit/cooperation.git

 

版权声明:本文为qq_36269641原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_36269641/article/details/109623392

智能推荐

bireme数据源同步工具--debezium+kafka+bireme

1、介绍 Bireme 是一个 Greenplum / HashData 数据仓库的增量同步工具。目前支持 MySQL、PostgreSQL 和 MongoDB 数据源 官方介绍文档:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md 1、数据流 Bireme 采用 DELETE + COPY 的方式,将数据源的修改记...

一致性hash算法

散列(hash)在我看来就是一个数组,而与数组不同的点在于数组是按顺序写入的,而hash是按照一定的hash算法确定元素在数组中的位置的。hash最难的问题在于会有冲突出现,如果两个object根据相应的hash算法得出的值一样便产生了hash冲突。在所有解决hash冲突的方法中,我最欣赏的是链式解决法,即将hash到同一位置的元素用链表连接。当然还有其它几种处理hash冲突的算法,比如建立公共溢...

OpenCV-Python learning-1.安装,图片读取显示

1. OpenCV与OpenGL区别 https://www.zhihu.com/question/20212016 一个是让机器识别东西的,OpenCV是给电脑做眼睛的。 一个是让机器计算出更好画面的,OpenGL用在游戏渲染方面很多。 OpenCV(Open Source Computer Vision Library)是一个基于(开源)发行的跨平台计算机视觉库,OpenGL(全写Open G...

Mycat+Mysql分布式架构改造和性能压力测试

架构实现 Mycat作为数据库高可用中间件具备很多的功能,如负载均衡,分库分表,读写分离,故障迁移等。结合项目的实际情况,分库分表功能对于关联查询有很高的要求,需要从业务角度考虑分库分表后的关联查询SQL的分析,业务代码动作较大,所以在此方案中我们不考虑分库分表。主要应用Mycat的负载均衡及故障迁移的功能即可。 整个架构改造包括两个部分,第一是单例Mysql改为多个Mysql,同时负载均衡,并且...

人脸识别之疲劳检测(二)阈值法、KNN分类和K-means聚类

Table of Contents 1、均值法 2、中值法 3、KNN 4、K-means 结合上一节在获得人眼特征点后需要对睁眼闭眼状态做出判断,方法的选择需要经验结合公平的评价方法,使用大量测试集得到不同方法下的精确度并做出比较: 1、均值法 50帧睁眼数据取均值,得到不同阈值下精确度。 2、中值法 50帧睁眼数据取中值,得到不同阈值下精确度。 3、KNN KNN是一种ML常用分类算法,通过测...

猜你喜欢

CodeForce Tic-Tac-Toe

Two bears are playing tic-tac-toe via mail. It's boring for them to play usual tic-tac-toe game, so they are a playing modified version of this game. Here are its rules. The game is played on the foll...

Python雾里看花-抽象类ABC (abstract base class)

首先认识模块 abc,python中没有提供抽象类与抽象方法,然而提供了内置模块abc来模拟实现抽象类,例如提供泛映射类型的抽象类 abc.MutableMapping 继承abc.MutableMapping构造一个泛映射类型(类似python中的dict) 当然继承abc.Mapping 也可以,毕竟MutableMapping是其子类 dict是python中典型的映射类型数据结构,其接口的...

python 文件操作

2, with open (‘xx.txt’,‘w’,encoding=‘utf-8’) as f: f.write(‘文件内容或对象’)...

【Python基础】使用统计函数绘制简单图形

机器学习算法与自然语言处理出品 @公众号原创专栏作者 冯夏冲 学校 | 哈工大SCIR实验室在读博士生 2.1 函数bar 用于绘制柱状图 2.2 函数barh 用于绘制条形图 2.3 函数hist 用于绘制直方图 直方图与柱状图的区别 函数pie 用于绘制饼图 2.5 函数polor 用于绘制极线图 极线图是在极坐标系上绘出的一种图。在极坐标系中,要确定一个点,需要指明这个点距原点的角...

css:顶部按钮固定,上面内容滑动

这种需求我们平时见到很多的,实现方法也多的参差不齐,下面我说一种简单的。如图: 可以看到只有红线部分滚动,底下按钮是固定的。 代码...