I帧验证
MoMo Lv5

I帧验证

背景

设备中装有的模型为避免占用太多内存导致性能比较一般,有较多告警被误识别,而APP内部部署的模型精度较高,所以设计了当设备产生告警时把告警发送到手机端进行I帧验证来增加告警的准确性。
由sdk循环调用得到设备端的的数据,如果有人物移动等视频端将当前帧的时间戳、byteArray和设备ID传给APP端,APP端收到回调启动I帧验证,目的是更准确的告警实际事件

image

整体流程图如上。

这个模块涉及的技术栈:RxJava、JNI

这个模块嵌入在IPC模块中,在IPC页面中,会触发该设备的I帧验证,同时经过验证的结果也会经过流式返回并动态添加到告警列表中。

准确性: 由于APP内部会集成部署一些模型,该模型经过调参之后的结果比较准确,所以能够很好的保证设备告警的准确性。

实时性: 整个流程从从设备侧拉取I帧开始,在设备端解码再将图片传入模型进行计算,至最终得到结果并将结果标签返回至SDK中记录结束。APP内部的计算时间为平均500-700ms左右,整个流程平均为1000ms左右,达到产生告警即计算,算完之后则展示的效果,能够达到实时性的要求。

流程

  • 首先进入IPC(设备详情页)后,会通过触发triggerComputeForDevice(deviceID:String)这个方法,并在SDK层传入RxJava对象作为一个上下文context,同时传入编写好的JNI回调方法接收告警流式返回的结果。
  • 当设备告警产生时,SDK从设备侧拉取I帧数据,通过JNI调用业务回调方法并传入数据(dataArray,oriLabels)
  • I帧验证方法:首先将传入的I帧数据解码成yuvImage再转成bitmap,然后创建I帧验证管线,传入bitmap进行处理并得到结果标签
  • 对标签结果进行转化并返回
  • 对bitmap进行压缩,通过DiskLruCache缓存,用于告警展示时的封面

I帧验证管线:通过TFLite加载模型,加载完成后接收特定格式的数据(需要对bitmap进行转化)进行计算推理,对得到的结果读取转换,最终得到结果中是否有我们想要的标签(如人脸、人形、宠物等)

SDK中创建方法

问题难点

  • JNI代码的编写需要学习了解 -> 仿照其他JNI方法进行编写,同时了解相关知识
  • I帧验证的管线的生命周期(因为涉及算法模型,彼时占用内存较大,所以就需要确定什么时候释放) -> 采用工厂模式+计时来确定管线对象的生存周期(通过锁实现管线对象的正常持续存在,防止定时任务中另一线程的操作导致管线对象获取后释放),每次方法调用时都从Object类中获取,
    • 首先加锁,如果管线对象存在则取消计时器,直接返回对象;
    • 如果管线对象已经释放,则解锁,重新创建管线对象并返回。同时重置计时器。
    • 当一段时间(设置为15s)没有获取管线对象时,则执行定时任务,释放管线对象,释放内存资源。计时器采用的是Timer()+TimeTask(),先加锁,再释放管线对象,再解锁
  • 方法调用的同步即I帧验证管线添加锁 -> 预防SDK异步多次调用同一个方法导致计算的结果混乱

内存优化问题

  • 存在过内存泄漏 原因是因为利用FFMPEG解码时,工具类对象重复创建没有释放导致
  • 性能分析 添加log计算每一个步骤可能需要的时间,如FFmpeg解码时间,yuv转bitmap时间,管线推理时间等,

具体步骤

  1. ByteArray转为YuvImage yuv=FFDecoder().use { it.decodePacket(data.clone()) }

  2. YuvImage转bitmap

    1
    2
    3
    4
    5
    6
    7
    val bitmap: Bitmap
    ByteArrayOutputStream().let {
    yuv.compressToJpeg(Rect(0, 0, yuv.width, yuv.height), 100, it)
    val jpegData = it.toByteArray()
    bitmap = BitmapFactory.decodeByteArray(jpegData, 0, jpegData.size)
    it.close()
    }
  3. 创建管线

    • 如果对象已经创建好,则直接返回使用,如果没有该对象,则创建后返回;同时添加倒计时,一段时间不使用则释放对象

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      fun getAlarmPipeline(ai: IntelligenceRepository, logger: Logger): LabelPipeline<Bitmap>? {
      alarmPipeLineLock.lock() // 通过锁实现管线对象的正常持续存在,防止定时任务中另一线程的操作导致管线对象获取后释放
      return if (deviceAlarmPipeLine != null) {
      cancelTimer()
      deviceAlarmPipeLine
      } else {
      alarmPipeLineLock.unlock()
      init(ai, logger)
      }
      }
  4. I帧验证缩放压缩缓存

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    val matrix = Matrix()
    val scale = FRAME_MIN_WIDTH / bitmap.width
    matrix.setScale(scale, scale)
    val scaled = Bitmap.createBitmap(
    bitmap, 0, 0, bitmap.width, bitmap.height, matrix, true
    )
    val scaledByteArray = ByteArrayOutputStream().use {
    scaled.compress(Bitmap.CompressFormat.JPEG, 40, it)
    it.flush()
    return@use it.toByteArray()
    }
    runBlocking {
    diskLruCacheOfFrameOfAlarm.put(
    deviceId, timestamp, ByteArrayInputStream(scaledByteArray), logger
    )
    val uri = diskLruCacheOfFrameOfAlarm.get(deviceId, timestamp)?.also {
    sdk.currentComputedThumbSubject().onNext(it) // 用于获取全局单例的已经被计算了的最新的图像
    sdk.reportKeyframeComputed() // 上报已计算关键帧+温度检测控制
    }
    logger.i(TAG, "computeOriAlarmKeyFrame, cache uri=$uri")
    }
  5. 启动yolov8和centerFace进行计算

    1
    val result = deviceAlarmPipeLine.process(bitmap, metaData = LabeledObject.MetaData()).flatMap { Single.just(it.labels) }.blockingGet()
  6. 返回结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    // 人脸结果
    val alarmKeyFrameFaceDetectionResult = result[ALARM_KEY_FRAME_FACE_COMPUTE_RESULT] as? Boolean
    // 人物宠物计算结果
    val alarmKeyFrameDetectionResult = result[ALARM_KEY_FRAME_YOLO_COMPUTE_RESULT] as? List<Yolo8DetectorOnlyKeyFrame.Detection>

    val resultList =
    if (alarmKeyFrameFaceDetectionResult == null || !alarmKeyFrameFaceDetectionResult) {
    if (alarmKeyFrameDetectionResult.isNullOrEmpty()) {
    // 没有检测到任何标签,置为画面变化 detect_move fixme:这里只是不确定,如果是确定的空告警,应该删除
    listOf(Label.DETECT_MOVE.key)
    } else {
    val detection = alarmKeyFrameDetectionResult.firstOrNull()
    when (detection?.topClassIdx) {
    5, 6 -> {
    // 宠物 detect_pet
    listOf(Label.DETECT_ANIMAL.key)
    }

    0 -> {
    // 人物 detect_person
    listOf(Label.DETECT_PERSON.key)
    }

    else -> {
    // 检测为空,置为画面变化
    listOf(Label.DETECT_MOVE.key)
    }
    }
    }
    } else {
    // 检测到人脸 detect_face
    listOf(Label.DETECT_PERSON_FACE.key)
    }

ReentrantLock

  • ReentrantLock 是 一个锁(Lock)实现类,用于多线程编程中控制对共享资源的访问。可以实现对共享资源的独占访问,避免多个线程同时修改共享数据导致的竞态条件和数据不一致问题,用于实现线程之间的同步,确保在同一时刻只有一个线程能够访问共享资源,其他线程需要等待锁释放后才能访问。

    • 可重入性ReentrantLock 具有可重入性,同一个线程可以多次获取同一把锁,避免死锁情况。
    • 公平性ReentrantLock 可以设置为公平锁,按照线程请求的顺序获取锁,减少线程饥饿情况。
    • 灵活性ReentrantLock 提供了丰富的方法,如尝试获取锁、设置超时等待、条件变量等,更灵活地控制线程的等待和唤醒。

    try 块中获取锁,在 finally 块中释放锁,确保即使发生异常也能正确释放锁,避免资源泄漏。

    提供了一种更灵活、可控的锁机制,适用于复杂的多线程同步场景。但需要注意的是,在使用过程中要确保正确地获取和释放锁,以避免死锁和性能问题。

DiskLruCacheOfFrameOfAlarm

其中diskLruCacheOfFrameOfAlarm是用于缓存报警帧图像的类 DiskLruCacheOfFrameOfAlarm,通过 DiskLruCache 实现对报警帧图像的缓存操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
open class DiskLruCacheOfFrameOfAlarm @Inject constructor(@ApplicationContext private val context: Context) {

private val diskLruCache: DiskLruCache = DiskLruCache.open(
File(context.cacheDir, "alarm_frame"),
0, // 固定版本号为0,保证APP升级时旧的缓存不被清理
1, // 一个Key仅对应1个文件
1024L * 1024 * 100
)

// 从缓存中获取指定设备ID和时间戳对应的报警帧图像的Uri
@Throws(IOException::class)
suspend fun get(deviceId: String, timestamp: Long): Uri? {
return withContext(Dispatchers.IO) {
val key = String.format("%s_%s", deviceId, timestamp)
diskLruCache.get(key)?.getFile(0)?.toUri()
}
}

// 将报警帧图像缓存到磁盘中
@Throws(IOException::class)
suspend fun put(deviceId: String, timestamp: Long, frame: InputStream, logger: Logger) {
withContext(Dispatchers.IO) {
val key = String.format("%s_%s", deviceId, timestamp)
try {
val editor = diskLruCache.edit(key)?: throw IOException("Editor of file create fail!")
editor.newOutputStream(0)?.use { fileOutputStream -> frame.use { frame.copyTo(fileOutputStream) }}
?: throw IOException("OutputStream of file create fail!")
editor.commit()
} catch (e: Exception) {
logger.e("DiskLruCacheOfFrameOfAlarm", "DiskLruCacheOfFrameOfAlarm put error ", e)
}
}
}

}
    • get(deviceId: String, timestamp: Long): Uri? 方法用于从缓存中获取指定设备ID和时间戳对应的报警帧图像的 Uri
      • 通过 withContext(Dispatchers.IO) 在 IO 线程中执行操作,避免阻塞主线程。
      • 根据设备ID和时间戳生成唯一的 key,通过该 key 从 DiskLruCache 中获取对应的缓存文件,并将其转换为 Uri 返回。
    • put(deviceId: String, timestamp: Long, frame: InputStream, logger: Logger) 方法用于将报警帧图像缓存到磁盘中。
      • 通过 withContext(Dispatchers.IO) 在 IO 线程中执行操作。
      • 根据设备ID和时间戳生成唯一的 key,通过该 key 获取 DiskLruCache 的编辑器(editor),将报警帧图像写入缓存文件中。
      • 最后提交编辑器以完成缓存操作,如果出现异常则记录日志。
  1. 目的和优势
    • 目的:该类的主要目的是实现对报警帧图像的缓存,以提高图像访问速度和节省网络带宽。
    • 优势
      • 快速访问:通过将报警帧图像缓存到磁盘上,可以减少重复下载和处理,提高访问速度。
      • 节省资源:缓存可以减少对网络和服务器资源的依赖,节省带宽和服务器负载。
      • 数据一致性:通过缓存可以保持数据的一致性,避免多次下载相同的数据。

currentComputeStateSubject

用户记录当前设备状态

1
2
3
4
5
6
7
8
9
10
11
data class ComputeState(
val networkSyncable: Boolean = false,
val batteryComputable: Boolean = false,
val thermalHigh: Boolean = false,
val readyToTriggerCompute: Boolean = false,
val waitComputePeriods: Int = 0,
val generatedClips: Long = 0,
val computedKeyFrames: Long = 0,
)

private val currentComputeStateSubject = BehaviorSubject.create<ComputeState>()

reportKeyframeComputed中会将当前要上报的帧数+1,并且调用下面这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private fun reduceCurrentComputeState() {
val syncable = (isWiFi || syncWithCellular)
val computable = (batteryLevel > minBatteryLevel)

// TODO CPU温度低版本Android系统拿不到,后续再看下怎么弄
val thermalState = 0
val thermalHigh = false
// TODO 温控策略,待进一步修改
val thermalReady = generatedClipsCount < 20 || !thermalHigh

val readyToTrigger = syncable && computable && thermalReady

val newState = ComputeState(
syncable, computable, thermalHigh, readyToTrigger,
waitComputePeriodsSum, generatedClipsCount, computedKeyFrameCount
)
logger.i(TAG, "reduceCurrentComputeState, $newState")
currentComputeStateSubject.onNext(newState)
}

在这里面

1
2
3
4
isWiFi = env.netType == NET_TYPE_WIFI   //网络类型,1:WiFi 2:移动网络(手机设备获取)
syncWithCellular = env.allowSyncOnMobileData == ALLOW_SYNC_ON_MOBILE_DATA_YES // 允许在移动网络下进行数据同步,1:是 2:否(用户选择)
batteryLevel = env.soc / 100f // 当前剩余电量百分比
minBatteryLevel = env.allowComputeMinSoc / 100f // 允许进行计算的最低电量百分比
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 当前网络是否不按量计费,可以传输大量数据
*/
fun isUsingNotMeteredNetwork(context: Context): Boolean {
val manager = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
manager.activeNetwork?.let { network ->
manager.getNetworkCapabilities(network)?.let {
// 高于 Android M,判断用户是否设置当前网络为不限流量
return it.hasCapability(NetworkCapabilities.NET_CAPABILITY_NOT_METERED)
}
}
return false
} else {
@Suppress("DEPRECATION") // 低于 Android M 的兼容逻辑,认为WiFi和有线网络是不计费网络
manager.activeNetworkInfo?.let { networkInfo ->
return networkInfo.type == ConnectivityManager.TYPE_WIFI || networkInfo.type == ConnectivityManager.TYPE_ETHERNET
}
return false
}
}

子码流

这个主要是用于首页产生的视频

i帧验证和子码流,用于管线加速,小码流拉视频变快, 可以用于实时性的识别场景,是离线场景,例如上一秒在家门口有危险手机打开之后拉取小码流数据可以几乎实时获取到,手机马上有提醒。原始的4k图片很大,虽然更清晰但速度很慢,可能需要十几分钟才可以计算完成,不够高效,而子码流因为码率很低所以不会卡,这个方法可以满足实时性,任何一台手机打开,基本不消耗流量,就可以得到一个实时的安防设备,并且用的是用户的手机流量和性能,基本不占用公司的服务器

主要目的:加速管线,实时性

同时测试了yolov8在不同码流下的检测速度,并结合手机性能和内存区分可以跑什么模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
val totalMemorySize =
extraParams?.getLong(KEY_TOTAL_MEMORY_SIZE, -1)?.let { if (it <= 0) null else it }

interpreter = if (totalMemorySize != null && totalMemorySize < MEMORY_SIZE_6GB) {
// 实际内存小于6G的设备,使用CPU运算,大于等于6G的设备,使用GPU运算
baseInterpreter(model)
} else {
// 依次尝试GPU/NNAPI/CPU
gpuDelegateInterpreterOrNull(model)
// 目前使用的是浮点的模型,如果有些设备支持了NPU并且自动使用了NPU会导致精度下降,因此先注释掉该配置
// ?: nnApiDelegateInterpreterOrNull(model)
?: baseInterpreter(model)
}

fun baseInterpreter(model: MappedByteBuffer): Interpreter {
val cpuOpt = Interpreter.Options()
cpuOpt.numThreads = 2 // 尝试并行地使用多个线程,在多核CPU设备上运行时可以显著提高推理速度
logger.i("baseInterpreter t=${cpuOpt.numThreads}")
return Interpreter(model, cpuOpt)
}

fun gpuDelegateInterpreterOrNull(model: MappedByteBuffer): Interpreter? {
val gpuOpt = Interpreter.Options()
gpuOpt.numThreads = 1 // 在单线程模式下运行
gpuOpt.addDelegate(GpuDelegate())
return try {
val it = Interpreter(model, gpuOpt)
logger.i("gpuDelegateInterpreterOrNull success")
it
} catch (e: Exception) {
logger.e("gpuDelegateInterpreterOrNull error", e)
null
}
}

该项目的应用中首先从sdk获取到需要计算的一大段视频起止时间,是小码流数据,之后通过pretask得到这一段视频的分数,如果有分数证明这一段是有价值的视频(检测到人、宠物、人脸等),根据原始视频的长度截取一段最终在首页显示的视频长度,通过滑动窗口的方法得到分数最大的一段视频返给sdk,触发正式管线的计算

I帧和pre都是子码流

Powered by Hexo & Theme Keep
Unique Visitor Page View