1、使用场景
当开发复杂报表,需要处理大量数据,不管怎么优化计算和查询语句,程序的运行效率还是达不到用户要求,怎么办?
为了解决这个问题,就需要程序实现并行处理。
本文档就是通过异步调用远程RFC的办法,实现对大量数据的计算,以并行的方式,更快的计算出最终结果。
2、代码实现
在实现并行处理时,首先要看系统当前能并行的资源数
"--------------------@斌将军--------------------
"获取服务器标识
CALL 'C_SAPGPARAM'ID 'NAME' FIELD 'rdisp/myname'ID 'VALUE' FIELD gv_applserver."获取登录/服务器组名称
SELECT SINGLEclassname
FROM rzllitab
INTO gv_classname "Server Group Name
WHERE applserver = gv_applserverAND grouptype = 'S' . "S:服务器组,空:登陆组CALL FUNCTION 'SPBT_INITIALIZE'EXPORTINGgroup_name = gv_classnameIMPORTINGmax_pbt_wps = gv_total "可用资源总数free_pbt_wps = gv_available "空闲资源数EXCEPTIONSinvalid_group_name = 1internal_error = 2pbt_env_already_initialized = 3currently_no_resources_avail = 4no_pbt_resources_found = 5cant_init_different_pbt_groups = 6OTHERS = 7.
"--------------------@斌将军--------------------
将逻辑处理封装为远程RFC,比如现在要计算1000行数据,每行数据乘以一百万次循环的累加数,然后展示出结果。远程RFC的计算逻辑如下:
"--------------------@斌将军--------------------
FUNCTION ytest001_001.
*"----------------------------------------------------------------------
*"*"本地接口:
*" IMPORTING
*" VALUE(I_INDEX) TYPE I
*" VALUE(I_COUNT) TYPE I
*" EXPORTING
*" VALUE(E_INDEX) TYPE I
*" VALUE(E_COUNT) TYPE I
*"----------------------------------------------------------------------DO 1000000 TIMES.i_count = i_count + 1.ENDDO.i_count = i_count * i_index.e_index = i_index.e_count = i_count.ENDFUNCTION.
"--------------------@斌将军--------------------
在主程序中调用远程RFC,并且添加远程调用的系统报错异常communication_failure和system_failure
"--------------------@斌将军--------------------
CALL FUNCTION 'YTEST001_001' STARTING NEW TASK gv_tasknameDESTINATION IN GROUP gv_classnamePERFORMING frm_ytest ON END OF TASK "调用每条线程的处理方法,接收处理结果EXPORTINGi_index = ps_alv-indexi_count = ps_alv-countEXCEPTIONScommunication_failure = 1 MESSAGE lv_messagesystem_failure = 2 MESSAGE lv_messageresource_failure = 3.
"--------------------@斌将军--------------------
并在主程序中接收远程RFC的返回消息
"--------------------@斌将军--------------------
"返回消息处理
FORM frm_ytest USING taskname.DATA:ls_alv TYPE ty_alv."接收处理数据的返回消息RECEIVE RESULTS FROM FUNCTION 'YTEST001_001'IMPORTINGe_index = ls_alv-indexe_count = ls_alv-countEXCEPTIONScommunication_failure = 1 MESSAGE lv_messagesystem_failure = 2 MESSAGE lv_message.MODIFY gt_alv FROM ls_alv TRANSPORTING count WHERE index = ls_alv-index."已完成进程 + 1gv_end_jobs = gv_end_jobs + 1.
ENDFORM.
"--------------------@斌将军--------------------
完整参考代码:
"--------------------@斌将军--------------------
TYPES:BEGIN OF ty_alv,index TYPE i,count TYPE i,END OF ty_alv.DATA:gt_alv TYPE TABLE OF ty_alv,gs_alv TYPE ty_alv.DATA:gv_open_jobs TYPE i, "开启的进程gv_jobs TYPE i, "可用的进程gv_end_jobs TYPE i. "结束的进程DATA:gv_applserver TYPE rzlli_asvr, "服务器标识 实例名称(用于登录/服务器组分配)gv_classname TYPE rzlli_apcl, "登录/服务器组名称gv_taskname TYPE char10, "进程名称gv_init_flag TYPE char1,gv_total TYPE i, "可用资源总数gv_available TYPE i. "空闲资源数DATA:lv_count TYPE i,lv_flat TYPE p DECIMALS 2,lv_message TYPE char200."准备1000行测试数据
DO 1000 TIMES.gs_alv-index = sy-index.gs_alv-count = 1.APPEND gs_alv TO gt_alv.
ENDDO."获取服务器标识
PERFORM frm_get_server."获取服务器组对应的可用资源数
PERFORM frm_get_jobs_available."处理每行数据,实际应用时,看如何将数据“分批”
CLEAR:lv_count.
LOOP AT gt_alv INTO gs_alv.lv_count = lv_count + 1."进程任务计数器IF gv_open_jobs - gv_end_jobs = gv_jobs."已开进程 - 已结束进程 = 可用进程WAIT UNTIL gv_open_jobs - gv_end_jobs < gv_jobs."等待 已开进程 - 已结束进程 < 可用进程ENDIF."拼接进程名称gv_taskname = 'Task' && lv_count.CONDENSE gv_taskname."逻辑处理PERFORM frm_deal_task USING gs_alv.CLEAR:gs_alv.
ENDLOOP."等待所有的进程执行完毕
WAIT UNTIL gv_end_jobs >= gv_open_jobs."展示结果
CALL METHOD cl_demo_output=>displayEXPORTINGdata = gt_alv.*&---------------------------------------------------------------------*
*& Form FRM_GET_SERVER
*&---------------------------------------------------------------------*
* text 获取服务器标识
*----------------------------------------------------------------------*
FORM frm_get_server."获取服务器标识CALL 'C_SAPGPARAM'ID 'NAME' FIELD 'rdisp/myname'ID 'VALUE' FIELD gv_applserver."获取登录/服务器组名称SELECT SINGLEclassnameFROM rzllitabINTO gv_classname "Server Group NameWHERE applserver = gv_applserverAND grouptype = 'S' . "S:服务器组,空:登陆组ENDFORM.*&---------------------------------------------------------------------*
*& Form FRM_GET_JOBS_AVAILABLE
*&---------------------------------------------------------------------*
* text 获取服务器组对应的可用资源数
*----------------------------------------------------------------------*
FORM frm_get_jobs_available.gv_jobs = 0."可用资源数IF gv_init_flag IS INITIAL."第一次获取资源"资源查询 - 获取最多jobs 个数CALL FUNCTION 'SPBT_INITIALIZE'EXPORTINGgroup_name = gv_classnameIMPORTINGmax_pbt_wps = gv_total "可用资源总数free_pbt_wps = gv_available "空闲资源数EXCEPTIONSinvalid_group_name = 1internal_error = 2pbt_env_already_initialized = 3currently_no_resources_avail = 4no_pbt_resources_found = 5cant_init_different_pbt_groups = 6OTHERS = 7.CASE sy-subrc.WHEN 0.lv_flat = gv_available * 9 / 10.gv_jobs = floor( lv_flat ). "拿其中一部分的空闲资源数来执行IF gv_jobs = 0 AND gv_available = 1.gv_jobs = 1.ENDIF.gv_init_flag = 'X'."按照jobs来进行数据拆分WHEN 1.
* MESSAGE E836."未定义 PBT 服务器组WHEN 2.WHEN 3.
* MESSAGE E833."已为组初始化了 PBT 环境WHEN 4.
* MESSAGE E837."所有服务器正忙:没有可用的资源WHEN 5.WHEN 6.ENDCASE."刷新资源数量ELSE."第二次获取资源调用函数CALL FUNCTION 'SPBT_GET_CURR_RESOURCE_INFO'IMPORTINGmax_pbt_wps = gv_totalfree_pbt_wps = gv_availableEXCEPTIONSinternal_error = 1pbt_env_not_initialized_yet = 2OTHERS = 3.CASE sy-subrc .WHEN 0.lv_flat = gv_available * 9 / 10.gv_jobs = floor( lv_flat ). "拿其中一部分的空闲资源数来执行IF gv_jobs = 0 AND gv_available = 1.gv_jobs = 1.ENDIF.WHEN 1.WHEN 2.CLEAR gv_init_flag.PERFORM frm_get_jobs_available. "获取失败则递归调用WHEN 3.ENDCASE.ENDIF.
ENDFORM. " FRM_GET_JOBS_AVAILABLE*&---------------------------------------------------------------------*
*& Form FRM_DEAL_TASK
*&---------------------------------------------------------------------*
* text 逻辑处理
*----------------------------------------------------------------------*
FORM frm_deal_task USING ps_alv TYPE ty_alv."调用需要并行执行的函数,此函数必须为远程函数CALL FUNCTION 'YTEST001_001' STARTING NEW TASK gv_tasknameDESTINATION IN GROUP gv_classnamePERFORMING frm_ytest ON END OF TASK "调用每条进程的处理方法,接收处理结果EXPORTINGi_index = ps_alv-indexi_count = ps_alv-countEXCEPTIONScommunication_failure = 1 MESSAGE lv_messagesystem_failure = 2 MESSAGE lv_messageresource_failure = 3.IF sy-subrc = 0.gv_open_jobs = gv_open_jobs + 1. "开启进程成功,则已开启变量 + 1ELSEIF sy-subrc = 3.WAIT UP TO '0.1' SECONDS.WAIT UNTIL gv_open_jobs - gv_end_jobs < gv_jobs."等待 已开进程 - 已结束进程 < 可用进程PERFORM frm_deal_task USING ps_alv."递归调用,重复开启进程ELSE.
* WRITE: sy-index,sy-subrc,lv_message.ENDIF.
ENDFORM.*&---------------------------------------------------------------------*
*& Form FRM_YTEST
*&---------------------------------------------------------------------*
* text 返回消息处理
*----------------------------------------------------------------------*
FORM frm_ytest USING taskname.DATA:ls_alv TYPE ty_alv."接收处理数据的返回消息RECEIVE RESULTS FROM FUNCTION 'YTEST001_001'IMPORTINGe_index = ls_alv-indexe_count = ls_alv-countEXCEPTIONScommunication_failure = 1 MESSAGE lv_messagesystem_failure = 2 MESSAGE lv_message.MODIFY gt_alv FROM ls_alv TRANSPORTING count WHERE index = ls_alv-index."已完成进程 + 1gv_end_jobs = gv_end_jobs + 1.
ENDFORM.
"--------------------@斌将军--------------------
3、效率对比
常规处理方式的代码:
"--------------------@斌将军--------------------
LOOP AT gt_alv INTO gs_alv.CALL FUNCTION 'YTEST001_001'EXPORTINGi_index = gs_alv-indexi_count = gs_alv-countIMPORTINGe_index = gs_alv-indexe_count = gs_alv-count.MODIFY gt_alv FROM gs_alv TRANSPORTING count WHERE index = gs_alv-index.
ENDLOOP.
"--------------------@斌将军--------------------
耗时对比
并行处理耗时1.7秒,常规处理耗时172秒,效果立竿见影
定期更文,欢迎关注