Android Datastore 动态创建与源码解析

涉及到的知识点

1、协程原理---->很好的博客介绍,一个小故事讲明白进程、线程、Kotlin 协程到底啥关系?
2、Channel知识点---->Android—kotlin-Channel超详细讲解
3、Coroutines : CompletableDeferred and structured concurrency

封装的DataStoreUtils工具—>gitHub

本篇博客目的

公司使用SharedPreferences容易导致ANR,调研能否使用DataStore替换公司目前的SharedPreferences解决ANR问题,所以需要先研究一下源码

目录
  • 版本引入
  • 迁移SharedPreferences数据到dataStore
  • 动态创建DataStore
  • 存储参数
  • 总结
版本引入
implementation "androidx.datastore:datastore-preferences:1.0.0"
迁移SharedPreferences数据到dataStore

既然是迁移数据,那么需要将SharedPreferences已存储的数据迁移到dataStore,所以需要先构建dataStore。
目前网上构建迁移DataStore的案例Demo如下

//迁移使用
private val Context.dataStore: DataStore<Preferences> by preferencesDataStore(name = "userSharePreFile",produceMigrations = { context ->listOf(SharedPreferencesMigration(context,"userSharePreFile"))}
)//或 
//这种构建DataStore写法是alpha版本有的,在1.0.0版本就找不到了
var dataStore: DataStore<Preferences> = context.createDataStore(name = "userSharePreFile")
//或
//直接构建
private val Context.dataStore: DataStore<Preferences> by preferencesDataStore(name = "userSharePreFile"
)

上面3种写法都是对Context进行扩展创建的DataStore,所以上面创建的方式,都有一个缺点,就是需要提前知道name才能创建,如果你之前创建SharedPreferences的方式,是通过外部传递进来name构建的话,上面直接创建DataStore方式就显然不适合你了。

翻阅旧版本(alpha版本)源码,一探究竟如何构建DataStore
//alpha版本构建方式
var dataStore: DataStore<Preferences> = context.createDataStore(name = "userSharePreFile")fun Context.createDataStore(name: String,corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,//①migrations: List<DataMigration<Preferences>> = listOf(),//②scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): DataStore<Preferences> =PreferenceDataStoreFactory.create(//③produceFile = {File(this.filesDir, "datastore/$name.preferences_pb")},corruptionHandler = corruptionHandler,migrations = migrations,scope = scope)

可以明显看到是使用PreferenceDataStoreFactory.create返回DataStore
① 是构建需要迁移SharedPreferences文件名称
② 指明协程是在IO运行
③ 新文件存储的位置
再看看另外一种通过 by preferencesDataStore 创建DataStore方式

private val Context.dataStore: DataStore<Preferences> by preferencesDataStore(name = "userSharePreFile"
)public fun preferencesDataStore(name: String,corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,//①produceMigrations: (Context) -> List<DataMigration<Preferences>> = { listOf() },//②scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): ReadOnlyProperty<Context, DataStore<Preferences>> {return PreferenceDataStoreSingletonDelegate(name, corruptionHandler, produceMigrations, scope)
}internal class PreferenceDataStoreSingletonDelegate internal constructor(private val name: String,private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,private val produceMigrations: (Context) -> List<DataMigration<Preferences>>,private val scope: CoroutineScope
) : ReadOnlyProperty<Context, DataStore<Preferences>> {private val lock = Any()@GuardedBy("lock")@Volatileprivate var INSTANCE: DataStore<Preferences>? = nulloverride fun getValue(thisRef: Context, property: KProperty<*>): DataStore<Preferences> {return INSTANCE ?: synchronized(lock) {if (INSTANCE == null) {val applicationContext = thisRef.applicationContextINSTANCE = PreferenceDataStoreFactory.create(corruptionHandler = corruptionHandler,migrations = produceMigrations(applicationContext),scope = scope) {applicationContext.preferencesDataStoreFile(name)}}INSTANCE!!}}
}//文件存储位置
public fun Context.preferencesDataStoreFile(name: String): File =this.dataStoreFile("$name.preferences_pb")

题外话:这里有利用kotlin委托属性by关键字语法
① 需要迁移的SharedPreferences文件
② 协程运行在IO

可以看出旧版本(alpha) 与 by preferencesDataStore 2种方案,都最终通过PreferenceDataStoreFactory.create,返回DataStore,我们就继续再看看PreferenceDataStoreFactory.kt的具体实现逻辑

//PreferenceDataStoreFactory.ktpublic fun create(corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,//迁移的share文件集合migrations: List<DataMigration<Preferences>> = listOf(),//IOscope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),//dataStore文件存储的目录位置produceFile: () -> File ): DataStore<Preferences> {val delegate = DataStoreFactory.create(//创建SingleProcessDataStoreserializer = PreferencesSerializer,corruptionHandler = corruptionHandler,migrations = migrations,scope = scope) {//省略代码} //传入SingleProcessDataStorereturn PreferenceDataStore(delegate)}//这里有主动的去调用updateData 方法,如果不去主动调用,就不会触发迁移的逻辑
//下文的扩展函数DataStore<Preferences>.edit会说到这里
internal class PreferenceDataStore(private val delegate: DataStore<Preferences>) :DataStore<Preferences> by delegate {override suspend fun updateData(transform: suspend (t: Preferences) -> Preferences):Preferences {return delegate.updateData {val transformed = transform(it)(transformed as MutablePreferences).freeze()transformed}}
}

继续看DataStoreFactory.create

//DataStoreFactory.kt
fun <T> create(produceFile: () -> File,serializer: Serializer<T>,corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,migrations: List<DataMigration<T>> = listOf(),scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())): DataStore<T> =//找到最终创建的类SingleProcessDataStore(produceFile = produceFile,serializer = serializer,corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),scope = scope)

到目前为止已经知道真相了,最终是通过SingleProcessDataStore返回DataStore。

下面我们通过一张图片来小结一下,旧版本alpha版本的创建与新版本 by preferencesDataStore的调用逻辑链

DataStore.jpg

好,已经知道这么多了,那么我们就开始动态构建DataStore

动态创建DataStore
 fun preferencesMigrationDataStore(sharedPreferName: String) {val dataStore = PreferenceDataStoreFactory.create(corruptionHandler =  ReplaceFileCorruptionHandler<Preferences>(produceNewData = { emptyPreferences() }),//需要迁移的sharePrefer文件的名称migrations = listOf(SharedPreferencesMigration(mContext, sharedPreferName)),//IOscope = CoroutineScope(Dispatchers.IO + SupervisorJob())) {//dataStore文件名称mContext.preferencesDataStoreFile(sharedPreferName)}runBlocking {//必须要执行这行代码,否是不会走迁移的逻辑dataStore.updateData {it.toPreferences()}}}

migrations:表示你要迁移的sharedPreference文件
scope :表示写数据是在IO
执行完上述代码后,.xml就会消失,然后会在files目录下多出一个/datastore/xxx.preferences_pb文件
切勿重复对某个SharedPreferences执行文件迁移方案,否则会报错。比如你前一秒在执行迁移,后一秒又继续执行迁移
SharedPrefs.png
dataStore_migrate.jpg

####存储参数

/*** @key 参数* @value 具体的值*/private fun putInt(key:String, value: Int) {runBlocking {dataStore.edit {//①it[intPreferencesKey(key)] = value}}}
//类似的还有如下,这些都是google提供的参数
intPreferencesKey
doublePreferencesKey
stringPreferencesKey
....

看①详情,点击edit,发现他是一个扩展函数

public suspend fun DataStore<Preferences>.edit(transform: suspend (MutablePreferences) -> Unit
): Preferences {return this.updateData {//调用的是PreferenceDataStore.updateData()//it.toMutablePreferences() 返回类似mapit.toMutablePreferences().apply { transform(this) }}
}

transform 就是调用者{}里面的内容,接下来我们看看 PreferenceDataStore 类的代码

//由前部分的代码,可以得知,delegate = SingleProcessDataStore 
internal class PreferenceDataStore(private val delegate: DataStore<Preferences>) :DataStore<Preferences> by delegate {override suspend fun updateData(transform: suspend (t: Preferences) -> Preferences):Preferences {return delegate.updateData {//调用SingleProcessDataStore.updateData //返回给上一个{}也就是  it.toMutablePreferences().apply { transform(this) }val transformed = transform(it)(transformed as MutablePreferences).freeze()transformed //拿到用户的需要更改的内容数据}}
}

代码里调用了delegate.updateData(), 所以继续看SingleProcessDataStore的updateData

SingleProcessDataStore.ktoverride suspend fun updateData(transform: suspend (t: T) -> T): T {val ack = CompletableDeferred<T>()val currentDownStreamFlowState = downstreamFlow.value//协程体封装进Message.Update,coroutineContext 是协程的上下文,就是我们的 runBlocking 启动的线程,我这里是mainval updateMsg = Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)//对消息进行分发,他的类是 SimpleActoractor.offer(updateMsg)//这里会拿到Preferences,如何拿?后面会有一个update.ack.completeWith方法,会返回回来return ack.await()}
internal class SimpleActor<T>(private val scope: CoroutineScope,//Dispatchers.IO + SupervisorJob()onComplete: (Throwable?) -> Unit,onUndeliveredElement: (T, Throwable?) -> Unit,private val consumeMessage: suspend (T) -> Unit
) {private val messageQueue = Channel<T>(capacity = UNLIMITED)private val remainingMessages = AtomicInteger(0)//......  省去//这里就是将刚刚封装的消息体,添加进这里fun offer(msg: T) {check(//发送封装的消息体messageQueue.trySend(msg).onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }.isSuccess)if (remainingMessages.getAndIncrement() == 0) {scope.launch {check(remainingMessages.get() > 0)do {// scope = Dispatchers.IO + SupervisorJob()scope.ensureActive()//取出封装的消息体,然后进行任务处理consumeMessage(messageQueue.receive())} while (remainingMessages.decrementAndGet() != 0)}}}
}

tip:这里有利用Channel进行协程通信,Channel是可以处理并发的情况
到这里,我们可以知道,我们由runBlocking(main主线程) 协程 到 Dispatchers.IO的任务分发

private val actor = SimpleActor<Message<T>>(scope = scope,// CoroutineScope(Dispatchers.IO + SupervisorJob())onComplete = {//.....省略},onUndeliveredElement = { msg, ex ->//.....省略) { msg ->//处理分发的任务,msg 为刚刚封装的updateMsg when (msg) { is Message.Read -> {//读取handleRead(msg)}is Message.Update -> {//更新handleUpdate(msg)}}}
 private suspend fun handleUpdate(update: Message.Update<T>) {update.ack.completeWith(runCatching {when (val currentState = downstreamFlow.value) {is Data -> {//写数据到filetransformAndWrite(update.transform, update.callerContext)}is ReadException, is UnInitialized -> {if (currentState === update.lastState) {           //读取file文件      ①          readAndInitOrPropagateAndThrowFailure()//写数据到file       ②transformAndWrite(update.transform, update.callerContext)} else {throw (currentState as ReadException).readException}}is Final -> throw currentState.finalException // won't happen}})}

第一次使用 downstreamFlow.value = UnInitialized 。
这里要注意一下update.ack.completeWith这个函数,他是拿到结果成功返回

这里再次展示出来,是告诉大家,在哪里会等待结果返回override suspend fun updateData(transform: suspend (t: T) -> T): T {val ack = CompletableDeferred<T>()val currentDownStreamFlowState = downstreamFlow.valueval updateMsg =Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)actor.offer(updateMsg)return ack.await() //这里就是等待 update.ack.completeWith的结果返回,所以如果不加这行,是不会卡主线程的}

所以使用runBlocking是会卡主线程的,如果你还有UI刷新情况,严重的情况会导致ANR问题

不扯之前的了,我们继续继续,看① 的读取

 private suspend fun readAndInitOrPropagateAndThrowFailure() {try {readAndInit()} catch (throwable: Throwable) {downstreamFlow.value = ReadException(throwable)throw throwable}}private suspend fun readAndInit() {check(downstreamFlow.value == UnInitialized || downstreamFlow.value is ReadException)//这个是锁,协程里面专有的,详情可以看 https://www.kotlincn.net/docs/reference/coroutines/shared-mutable-state-and-concurrency.htmlval updateLock = Mutex()//读取dataStore文件var initData = readDataOrHandleCorruption()var initializationComplete: Boolean = false//这里就是shareprefence转dataStoreval api = object : InitializerApi<T> {override suspend fun updateData(transform: suspend (t: T) -> T): T {return updateLock.withLock() {if (initializationComplete) {throw IllegalStateException("InitializerApi.updateData should not be " +"called after initialization is complete.")}//transform里面就是去迁移数据的方法val newData = transform(initData)//这里有做,新 旧值比较,如果不同,就去写入if (newData != initData) {//写文件writeData(newData)initData = newData}initData}}}//initTasks 里面装的就是需要转换的 SharedPreferences集合initTasks?.forEach { it(api) }initTasks = nullupdateLock.withLock {initializationComplete = true}//这里有将迁移完成后的数据,存储在flow.value里面downstreamFlow.value = Data(initData, initData.hashCode())}//读取dataStore文件
private suspend fun readDataOrHandleCorruption(): T {try {return readData()} catch (ex: CorruptionException) {val newData: T = corruptionHandler.handleCorruption(ex)try {writeData(newData)} catch (writeEx: IOException) {ex.addSuppressed(writeEx)throw ex}return newData}}private suspend fun readData(): T {try {FileInputStream(file).use { stream ->return serializer.readFrom(stream)}} catch (ex: FileNotFoundException) {if (file.exists()) {throw ex}return serializer.defaultValue}}

file就是我们存储的dataStore,目录是在 “datastore/$name.preferences_pb”

看完了①,再来看看② 写入数据到file,写数据的方法是 transformAndWrite()

//....
transformAndWrite(update.transform, update.callerContext)
//...private suspend fun transformAndWrite(//来源于 Message.Update.transform封装transform: suspend (t: T) -> T,//来源于 Message.Update.callerContext封装callerContext: CoroutineContext): T {val curDataAndHash = downstreamFlow.value as Data<T>curDataAndHash.checkHashCode()val curData = curDataAndHash.value//这里callerContext  就是我们的 runBlocking,main(主线程)//这里是将旧的值给回调用者,然后从调用者获取到新参数val newData = withContext(callerContext) { transform(curData) }curDataAndHash.checkHashCode()//这里有做数据比较return if (curData == newData) {curData} else {//写入数据writeData(newData)//保存到flow.value里面downstreamFlow.value = Data(newData, newData.hashCode())newData}}private val SCRATCH_SUFFIX = ".tmp"
//写入数据
internal suspend fun writeData(newData: T) {file.createParentDirectories()//这里创建出来的文件是"datastore/$name.preferences_pb.tmp"val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX)try {FileOutputStream(scratchFile).use { stream ->serializer.writeTo(newData, UncloseableOutputStream(stream))stream.fd.sync()}//重新命名回去file,这里的file是我们目标的文件dataStore名称if (!scratchFile.renameTo(file)) {//重新命名失败,抛出异常throw IOException("Unable to rename $scratchFile." +"This likely means that there are multiple instances of DataStore " +"for this file. Ensure that you are only creating a single instance of " +"datastore for this file.")}} catch (ex: IOException) {if (scratchFile.exists()) {scratchFile.delete() }throw ex}}

到此,更新值的操作,我们已经全部走完了流程

总结

1、文件的写入是发生在IO层面
2、使用runBlocking是会卡主线程,如果此时存在需要刷新UI的情况,严重会ANR


/*** @key 参数* @value 具体的值*/private fun putInt(key:String, value: Int) {runBlocking {dataStore.edit {it[intPreferencesKey(key)] = value}}}public suspend fun DataStore<Preferences>.edit(transform: suspend (MutablePreferences) -> Unit
): Preferences {return this.updateData {it.toMutablePreferences().apply { transform(this) }}
}//更新逻辑private suspend fun handleUpdate(update: Message.Update<T>) {update.ack.completeWith(//通知结果回调//.....省去)}//transform 就是上面的{}里面的内容override suspend fun updateData(transform: suspend (t: T) -> T): T {val ack = CompletableDeferred<T>()val currentDownStreamFlowState = downstreamFlow.valueval updateMsg =Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)actor.offer(updateMsg)return ack.await() //这里就是等待 update.ack.completeWith的结果返回,所以如果不加这行,是不会卡主线程的//题外话不加ack.await() 代码也会执行}

所以,可以考虑使用withContext(IO){读取/更新等待操作}

3、更新参数的时候,是会跟旧的值比较,如果值相同就不写入了,否则就写入到文件里面,并且更新flow.value的值

 return if (curData == newData) {curData} else {writeData(newData)downstreamFlow.value = Data(newData, newData.hashCode())newData}

4、解决并发问题,使用channel解决协程之间沟通与并发,单线程的IO更新文件与并发

5、如果已将SharedPreference迁移到DataStore,你就不要继续使用SharedPreferences了,如果继续使用SharedPreferences,会与DataStore的值不同了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/162778.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Chirp窄带扩频技术的无线混合组网应用,以多角色智能计量插座作为Chirp广域基站,构建边缘计算混合无线网络

随着物联网&#xff08;IoT&#xff09;的不断发展&#xff0c;无线通信技术的需求也在不断增加。Chirp窄带扩频技术是一种具有广泛应用潜力的无线通信技术&#xff0c;它在低功耗、广域覆盖、抗干扰等方面具备独特的优势。本文介绍了如何利用磐启微Chirp技术构建ECWAN无线混合…

卡尔曼滤波之二:Python实现

卡尔曼滤波之二&#xff1a;Python实现 1.背景描述2.构建卡尔曼滤波公式2.1 预测2.2 更新 3.代码实现3.1 输入值3.2 pykalman包实现3.3 不使用Python包实现3.4 效果可视化 参考文献 了解了卡尔曼滤波之一&#xff1a;基本概念&#xff0c;可以结合代码来理解下卡尔曼滤波的2个预…

【数据库】数据库模式 Schema

数据库模式 Schema 1.MySQL2.PostgreSQL3.SQL Server4.Oracle5.SQLite 在数据库的术语中&#xff0c;模式&#xff08;schema&#xff09;是一个逻辑概念&#xff0c;用于组织数据库中的对象。模式中的对象通常包括 表、索引、数据类型、序列、视图、存储过程、主键、外键 等等…

Electron[3] 基础配置准备和Electron入门案例

1 背景 上一篇文章已经分享了&#xff0c;如何准备Electron的基础环境了。但是博客刚发才一天&#xff0c;就发现有人问问题了。经过实践发现&#xff0c;严格按照作者的博客教程走是不会有问题的&#xff0c;其中包括安装的环境版本等都要一致。因为昨天发的博客&#xff0c;…

【论文阅读】Equivariant Contrastive Learning for Sequential Recommendation

【论文阅读】Equivariant Contrastive Learning for Sequential Recommendation 文章目录 【论文阅读】Equivariant Contrastive Learning for Sequential Recommendation1. 来源2. 介绍3. 前置工作3.1 序列推荐的目标3.2 数据增强策略3.3 序列推荐的不变对比学习 4. 方法介绍4…

kafka问题汇总

报错1&#xff1a; 解决方式 1、停止docker服务   输入如下命令停止docker服务 systemctl stop docker 或者service docker stop1   停止成功的话&#xff0c;再输入docker ps 就会提示出下边的话&#xff1a; Cannot connect to the Docker daemon. Is the docker daem…

汽车标定技术(四)--问题分析:多周期测量时上位机显示异常

目录 1.问题现象 2.数据流分析 ​​​​3.代码分析 3.1 AllocDAQ 3.2 AllocOdt 3.3 AllocOdtEntry 4.根因分析及解决方法 4.1 根因分析 4.2 解决方案 1.问题现象 在手撸XCP代码时&#xff0c; DAQ的实现是一大头痛的事情。最初单周期实现还好一点&#xff0c;特别是…

聊一聊关于手机Charge IC的电流流向

关于手机Charge&#xff0c;小白在以前的文章很少讲&#xff0c;一是这部分东西太多&#xff0c;过于复杂。二是总感觉写起来欠缺点什么。但后来想一想&#xff0c;本是抱着互相学习来写文章的心理态度&#xff0c;还是决定尝试写一些。 关于今天要讲的关于手机Charge的内容&a…

Visual Studio Code 常用快捷键大全

Visual Studio Code 常用快捷键大全 快捷键是编码过程中经常使用&#xff0c;且能够极大提升效率的部分&#xff0c;这里给大家介绍一些VS Code中非常有用的快捷键。 打开和关闭侧边栏 Mac — Command B Windows — Ctrl B Ubuntu — Ctrl B 选择单词 Mac — Command D …

本地电脑监控软件

本地电脑监控软件是一种用于监控计算机使用行为的软件&#xff0c;它可以帮助企业管理者了解员工的工作状态和行为&#xff0c;保护企业的计算机资源。 本地电脑监控软件可以监控员工的计算机使用行为&#xff0c;包括屏幕监控、键盘记录、文件操作等。这些功能可以帮助企业管理…

FFmpeg直播能力更新计划与新版本发布

// 编者按&#xff1a;客户端作为直接面向用户大众的接口&#xff0c;随着技术的发展进化与时俱进&#xff0c;实现更好的服务是十分必要的。FFmpeg作为最受欢迎的视频和图像处理开源软件&#xff0c;被相关行业的大量用户青睐&#xff0c;而随着HEVC标准的发布到广泛使用&am…

ClickHouse开发系列

一、 ClickHouse详解、安装教程_clickhouse源码安装 二、ClickHouse 语法详解_clickhouse讲解 三、ClickHouse SQL 操作语句详解 四、ClickHouse 高级教程—官方原版 五、ClickHouse主键索引最佳实践 六、MySQL与ClickHouse集成 七、ClickHouse 集成MongoDB、Re…