pyspark建模(类似于dwd层),flask直接对接前端请求进行召回(类似于ads层,但是不保存)

news/2025/1/4 19:10:03/文章来源:https://www.cnblogs.com/zwnfdswww/p/18527567

2. Spark MLib

2.1 Spark MLib 开发环境准备

2.1.1 配置python和spark环境

安装Python环境

安装Anaconda3-5.2.0-Windows-x86_64.exe配置环境变量Anaconda_HOME  
E:\20241014_Soft\Anaconda3PATH
%Anaconda_HOME%Scripts;%Anaconda_HOME%Library\mingw-w64\bin;%Anaconda_HOME%Library\usr\bin;%Anaconda_HOME%Library\bin
打开AnacondaPromtconda --version

安装spark环境

Windows下配置Spark运行环境及环境变量
spark-2.4.5-bin-hadoop2.7.tgz解压spark的安装包到磁盘目录
D:\Code\Soft\spark\spark-2.4.5-bin-hadoop2.7在环境变量中配置SPARK_HOME指定解压的路径SPARK_HOME  D:\Code\Soft\spark\spark-2.4.5-bin-hadoop2.7
将解压的spark安装包中的
D:\Code\Soft\spark\spark-2.4.5-bin-hadoop2.7\python\lib
复制到anaconda对应的目录下E:\20241014_Soft\Anaconda3\Lib\site-packages

image-20241019231052064

Step3:验证py4j是否安装成功,进入python环境,输入import py4j

image-20241019231750287

Step5:使用import导入pyspark模块,如果没错即安装成功。import pyspark

image-20241019231829943

2.1.2 idea安装python插件

image-20241019232018768

image-20241019232045876

image-20241019232123784

新建一个python项目,pyspark_test

统计每个职位投递总次数 & 投递总人数统计指定地区的投递的总人数 & 总次数统计每个地区投递次数最多职位topN
见pyspark_test代码

新建一个python项目,sparkmlib

线性回归模型(连续变量)
逻辑回归模型(分类变量)
决策树模型(分类变量)
随机森林模型(分类变量)
见sparkmlib代码

2.1.3 安装spark 集群

下载并安装
spark-2.4.5-bin-hadoop2.7.tgzcd /opt/lagou/software/tar zxvf spark-2.4.5-bin-hadoop2.7.tgz
sudo chown -R root:root spark-2.4.5-bin-hadoop2.7
sudo chmod -R 755 spark-2.4.5-bin-hadoop2.7mv spark-2.4.5-bin-hadoop2.7 ../servers/spark-2.4.5
配置
vi /etc/profile
export SPARK_HOME=/opt/lagou/servers/spark-2.4.5
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
source /etc/profile文件位置:
cd $SPARK_HOME/conf
修改文件:slaves、spark-defaults.conf、spark-env.sh、log4j.propertiescp log4j.properties.template log4j.properties
cp slaves.template slaves
cp spark-defaults.conf.template spark-defaults.conf
cp spark-env.sh.template spark-env.shvi slaveslinux121
linux122
linux123vi spark-defaults.confspark.master spark://linux121:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs://linux121:9000/spark-eventlog
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 512m
spark.yarn.jars hdfs:///spark-yarn/jars/*.jar修改spark-env.shvi spark-env.shexport JAVA_HOME=/opt/lagou/servers/jdk1.8.0_421
export HADOOP_HOME=/opt/lagou/servers/hadoop-2.7.3
export HADOOP_CONF_DIR=/opt/lagou/servers/hadoop-2.7.3/etc/hadoop
export SPARK_DIST_CLASSPATH=$(/opt/lagou/servers/hadoop-2.7.3/bin/hadoop classpath)
export SPARK_MASTER_HOST=linux121
export SPARK_MASTER_PORT=7077vi $HADOOP_HOME/etc/hadoop/yarn-site.xml新增<property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property># 配置spark-sql读取hive的元数据##将hive-site.xml 软连接到spark的conf配置目录中:
cd $SPARK_HOME/conf
ln -s $HIVE_HOME/conf/hive-site.xml hive-site.xmlvi hive-site.xml修改<property><name>hive.metastore.uris</name><value>thrift://linux122:9083</value><description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description></property>scp -r hive-site.xml linux121:/opt/lagou/servers/spark-2.4.5/confscp -r hive-site.xml linux123:/opt/lagou/servers/spark-2.4.5/conf##将连接 mysql-connector-java-5.1.35-bin.jar拷贝到spark的jars目录下
cp $HIVE_HOME/lib/mysql-connector-java-5.1.46.jar  $SPARK_HOME/jars将Spark软件分发到集群;修改其他节点上的环境变量cd /opt/lagou/servers/
scp -r spark-2.4.5/ linux122:$PWD
scp -r spark-2.4.5/ linux121:$PWD
scp -r spark-2.4.5/ linux123:$PWDsource /etc/profile注意:使用pyspark读取Hive 外部表(Hive 映射Hbase),需要额外准备Hbase,Hive相关Jar包到Spark。
cp $HBASE_HOME/lib/hbase-*.jar $SPARK_HOME/jars/
cp $HIVE_HOME/lib/hive-*.jar $SPARK_HOME/jars/# 将 $SPARK_HOME/jars 下的jar包上传到hdfs创建 HDFS 目录:
hdfs dfs -rm -r /spark-eventlog
hdfs dfs -rm -r /spark-yarn
hdfs dfs -mkdir /spark-eventlog
hdfs dfs -mkdir -p /spark-yarn/jars/cd $SPARK_HOME/jars
hdfs dfs -put * /spark-yarn/jars/额外补充:(linux122),重要(ps:因为一些版本匹配问题,所以有可能跑不通,需要先尝试跑通hbase,再尝试跑通spark_sql,唯一的办法是提前确定版本,找jar包解决不了问题)rm -rf $SPARK_HOME/jars/metrics-core-4.1.1.jar
scp $HBASE_HOME/lib/metrics-core-4.1.1.jar linux122:$SPARK_HOME/jars
scp $HBASE_HOME/lib/metrics-core-2.1.3.jar linux122:$HBASE_HOME/lib/
hdfs dfs -rm -r /spark-yarn/jars/metrics-core-2.1.3.jar
rm -rf $SPARK_HOME/jars/lz4-java-1.4.0.jar
scp lz4-java-1.8.1.jar linux122:$SPARK_HOME/jars/
rm -rf $SPARK_HOME/jars/metrics-core-2.2.0.jar
hdfs dfs -rm -r /spark-yarn/jars/lz4-java-1.8.1.jarcd $SPARK_HOME/jars/rm -rf parquet-hadoop-bundle-1.6.0.jarhdfs dfs -rm -r /spark-yarn/jars/lz4-java-1.4.0.jar
hdfs dfs -put lz4-java-1.4.1.jar /spark-yarn/jars/hdfs dfs -put $HBASE_HOME/lib/metrics-core-4.1.1.jar  /spark-yarn/jars/scp $SPARK_HOME/jars/*.jar linux121:$SPARK_HOME/jars/
scp $SPARK_HOME/jars/*.jar linux123:$SPARK_HOME/jars/rm -rf $SPARK_HOME/jars/lz4-java-1.4.1.jarhdfs dfs -rm -r /spark-yarn/jars/lz4-java-1.4.1.jarhdfs dfs -get /spark-yarn/jars/* 

启动(前提:Hadoop的 HDFS、Yarn、HistoryServer 正常;Spark historyserver服务正常;)

scp lz4-java-1.8.0.jar linux122:$SPARK_HOME/jars/
hdfs dfs -put lz4-java-1.8.0.jar /spark-yarn/jars/cd $SPARK_HOME/sbin./stop-all.sh
./start-all.sh#linux122
ps aux | grep metastorehive --service metastore &
#linux121
spark-sql --master yarn
spark-shell

测试


http://192.168.49.121:8080/
http://192.168.49.121:18080/

2.1.4 激活python3环境,并且启动jupyter notebook


# 创建一个名为 'spark-env' 的新环境,使用 Python 3.7
conda create -n spark-env python=3.7# 激活新环境
conda activate spark-env安装
conda install py4j
conda install jieba
conda install pyspark==2.4.5
conda install pyhive
conda install happybase==1.2.0
conda uninstall jupyter
# 或者安装 Jupyter Notebook 6.0.3
conda install notebook=6.0.3cd /opt/soft
mkdir -p /opt/soft/conda
chmod 777 /opt/soft/conda
scp -r conda/ linux121:/opt/soft
scp -r conda/ linux123:/opt/soft/opt/soft/conda
修改其它节点上的环境变量
新增
vim /etc/profile
export CONDA_HOME=/opt/soft/conda
export PATH=$PATH:$CONDA_HOME/binsource /etc/profile新建项目目录mkdir -p /root/data/code/job_recommended/
chmod 777 /root/data/code/job_recommended/
cd /root/data/code/job_recommended/启动在项目目录开启
# export TZ=Asia/Shanghai
# export LANG=en_US.UTF-8jupyter notebook --port=8889 --ip=0.0.0.0 --no-browser --allow-root
use ods;
show tables;
desc ods_position;

导入sql文件

hive -f /root/data/user_action.sql

进入jupyter notebook,新建一个concat_fields.ipynb

代码见/root/data/code/job_recommended/concat_fields.ipynb

python3位置:/opt/soft/conda/envs/superset/bin

在hive中新建相应的表,将结果插入hive中的表

drop table `ods.ods_position_content`;
CREATE TABLE `ods.ods_position_content`(
`id` string,
`region` string,
`position_category` string,
`content` string)
row format delimited fields terminated by ',';

2.1.4 TFIDF

新建目录,将文件放入

sudo mkdir -p /data/words
ITKeywords.txt   stopwords.txt

新建文件夹

hdfs dfs -rm -r /lgns/lg_models/
hdfs dfs -mkdir -p /lgns/lg_models/

在hive中新建相应的表

drop table `idf_keywords_values`;
CREATE TABLE idf_keywords_values(
keyword STRING comment "keyword",
idf DOUBLE comment "idf",
index INT comment "index");
-- 职位tfidf保存
CREATE TABLE tfidf_keywords_values(
position_id INT comment "position_id",
region string comment "region",
keyword STRING comment "keyword",
tfidf DOUBLE comment "tfidf");

新建文件compute_tfidf.ipynb

见compute_tfidf.ipynb

2.1.5 TextRank

创建textrank_keywords_values表

drop table if exists textrank_keywords_values ;
CREATE TABLE textrank_keywords_values(
position_id INT comment "position_id",
region String comment "region",
industry String comment "industry",
keyword STRING comment "keyword",
textrank DOUBLE comment "textrank");

新建文件compute_textrank.ipynb

见compute_textrank.ipynb代码

新建表

create table position_profile(
position_id String,
region String,
keywords MAP<String,String>,
topics ARRAY<String>
);drop table if exists position_vector;
CREATE TABLE position_vector(
position_id String comment "position_id",
region String comment "region",
position_vector ARRAY<double> comment "keyword")
row format delimited fields terminated by "/t" collection items terminated by
',';

新建文件word2vec.ipynb文件,计算职位画像结果和职位相似度

见word2vec.ipynb代码

hbase新建表

disable 'position_similar'
drop 'position_similar'
create 'position_similar', 'similar'
# 存储格式如下:key:为position_id, 'similar:position_id', 结果为相似度
put 'position_similar', '1', 'similar:2', 0.34
put 'position_similar', '1', 'similar:3', 0.267
put 'position_similar', '1', 'similar:4', 0.56
put 'position_similar', '1', 'similar:5', 0.7
put 'position_similar', '1', 'similar:6', 0.819
put 'position_similar', '1', 'similar:8', 0.28

hbase thrift start -p 9090

,之后才能连接hbase

2.1.8 用户画像构建

hbase新建表

create 'user_profile', 'basic','user_reference','env'

新建user_profile

见user_profile代码

代码可能会报hive连不到hbase的错误,导包到

cd $HIVE_HOME/lib/

上传 hive-hbase-handler-2.3.7.jar

新建hive表

drop table if exists user_profile_hbase;
create external table user_profile_hbase(
user_id STRING comment "userID",
basic map<string, String> comment "user basic information",
user_reference map<string, String> comment "user_reference",
env map<string, String> comment "user env")
COMMENT "user profile table"
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,basic:,user_reference:,env:")
TBLPROPERTIES ("hbase.table.name" = "user_profile");

这里因为版本不匹配,所以无法推进,为了节省时间,直接先略过

停止全部spark的命令

yarn application -list | grep -i spark | awk '{print $1}' | xargs -I {} yarn application -kill {}

2.1.9 召回与排序

新建hbase表

create 'lg_recall', {NAME=>'als', TTL=>1296000, VERSIONS=>999999}
alter 'lg_recall', {NAME=>'content', TTL=>1296000, VERSIONS=>999999}
alter 'lg_recall', {NAME=>'online', TTL=>1296000, VERSIONS=>999999}
# 例子:
put 'lg_recall', 'recall:user:5', 'als:1',[45,3,5,10]
put 'lg_recall', 'recall:user:5', 'als:1',[289,11,65,52,109,8]
put 'lg_recall', 'recall:user:5', 'als:2',[1,2,3,4,5,6,7,8,9,10]
put 'lg_recall', 'recall:user:2', 'content:1',[45,3,5,10,289,11,65,52,109,8]
put 'lg_recall', 'recall:user:2', 'content:2',[1,2,3,4,5,6,7,8,9,10]
create 'history_recall', {NAME=>'recall', TTL=>3888000, VERSIONS=>999999}
put 'history_recall', 'userid1', 'recall:history',[1,2,3]
put 'history_recall', 'userid1', 'recall:history',[4,5,6,7]
put 'history_recall', 'userid1', 'recall:history',[8,9,10]

新建AlsRecall文件,按用户召回

见AlsRecall代码

新建LRRank文件,按内容召回

见LRRank代码

2.1.10 推荐流程

windows环境

# 创建一个名为 'spark-env' 的新环境,使用 Python 3.7
conda create -n spark-env python=3.7# 激活新环境
conda activate spark-envconda install grpcio-tools
conda install grpcio
conda install pyspark==2.4.5
conda install happybase==1.2.0
conda install redis
# 首先切换到 E: 驱动器
E:# 然后进入目标目录
cd mysource\pyspark_test\com\abtest编译生成代码
python -m grpc_tools.protoc -I. --python_out=.. --grpc_python_out=reco.proto

新建hbase表

create 'ctr_user_feature', 'user_weigths'
create 'ctr_position_feature', 'position_weigths'

新建feature_process

见feature_process代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/827095.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

征程 6E camera diag sample

01 功能概述 本文的 demo sample 主要描述当前 camera 相关外设诊断的当前状态,并提供自定义实现的方法及使用说明。 1.1 软件架构说明 本 sample 基于现已实现的 camera 诊断架构,libcam 内的外设诊断功能对外设硬件状态进行监测,并支持将故障状态发送给 MCU 处理,或通过事…

实时数仓及olap可视化构建(基于mysql,将maxwell改成seatunnel可以快速达成异构数据源实时同步)

1. OLAP可视化实现(需要提前整合版本)Linux121 Linux122 Linux123jupyter✔spark ✔ ✔ ✔python3+SuperSet3.0✔hive✔ClinckHouse✔Kafka ✔ ✔ ✔Phoenix ✔DataX ✔maxwell✔Hadoop ✔ ✔ ✔MySQL✔ZK ✔ ✔ ✔HBASE ✔ ✔ ✔1.1 安装Vmware,安装虚拟机集群 1.1.1 安装 …

AI运动小程序开发常见问题集锦二

截止到现在写博文时,我们的AI运动识别小程序插件已经迭代了23个版本,成功应用于健身、体育、体测、AR互动等场景;为了让正在集成或者计划进行功能扩展优化的用户,少走弯路、投入更少的开发资源,针对近期的咨询问题,我们又归集了一些常见问题,供大家参考。一、计时、计数…

synchronized的monitor监视器

public class T {@SneakyThrowspublic static void main(String[] args) {System.out.println("此行后加锁 monitorenter");synchronized (T.class){System.out.println("hello monitor");}System.out.println("此行前释放锁 monitorexit");}}反…

31 计算机安全

计算机安全是保护系统和数据的,完整,保密,可用 保密:有权限的人才能读取数据;泄露信息就是攻击保密性(看不看得到)---窃取信息 完整性:能够修改数据,知道密码进入操作-----------------------------------获取权限 可用性:有权限的人应该能随时访问,黑客发大量请求到…

大话USB PD快充电源功率“协商”

啥叫USB PD快充技术? USB PD快充技术就是通过USB接口对对USB设备进行快速充电的一项技术。 由于USB技术的发展,特别的USB TYPE-C接口的广泛应用,基于USB TYPE-C接口的USB PD快充技术越来越成为主流。 使用USB TYPE-C接口的技术可以给谁充电?可以给我们的手机充电 可以给笔记…

瑞芯微RK3568开发板Linux编译报错404怎么办触觉智能教你轻松解决

本文介绍瑞芯微RK3568主板/开发板SDK编译流程和编译报错的解决方法,使用触觉智能EVB3568鸿蒙开发板演示,具有丰富的视频输入输出接口(HDMI/eDP/MIPI/LVDS) 与多种高速接口(千兆网口/PCIe/SATA/CAN等)。近期,触觉智能即将发布RK3568系列开源鸿蒙OpenHarmony5.0系统固件,敬…

网络流建图汇总

Dining G 一个点有两重限制,将点放中间,两边分别放两个限制,中间点点拆点连 1 表示限制 CTSC1999 家园 / 星际转移问题 时间限制可以分层图,分层图不需二分,直接一层层建即可 企鹅游行 这种有限制的拆点就完了。 猪 时序问题按照时间建即可。 一般出现调整的可以考虑把调整…

UE中基于FluidFlux插件实现洪水数据接入的一种思路

这是FluidFlux插件文档链接: http://imaginaryblend.com/2021/09/26/fluid-flux/ FluidFlux插件原本可以在编辑器模式下,通过右键SimulationDomain保存模拟状态,这个模拟状态保存后是一个资产文件以及三张纹理图Ground,Height,Velocity。SimulationDomain中有一个俯视的场景…

3 有限体积法:推导方程

3 有限体积法:推导方程 基本原理和目标 (注意:这一节看不懂没关系,在后面的推导中会慢慢用到)质量、动量和能量的守恒流体的质量守恒 动量改变的速度 = 一个流体粒子上受到的力的总和(牛顿第二定律) 能量改变的速度 = 一个流体粒子吸收的热量,和作用在其上的功的总和(…

安全通道占用识别

安全通道占用识别系统利用现场已有的监控摄像头,安全通道占用识别通过先进的AI算法,对消防通道/安全通道进行实时监测。一旦监测到通道被占用、堵塞的情况,系统会立即发出告警,并通过多种方式将告警信息迅速推送给相关管理人员。这不仅极大提升了监控区域的管控效率,更为重…

【征程 6 工具链性能分析与优化-2】模型性能优化建议

01 引言为了应对低、中、高阶智驾场景,以及当前 AI 模型在工业界的应用趋势,地平线推出了征程 6 系列芯片。 在软硬件架构方面,征程 6 不仅保持了对传统 CNN 网络的高效支持能力,还强化了对 Transformer 类型网络的支持,主要表现为大幅强化了对逐点计算、数据搬运的能力。…