Browse Source
Custom cache layer for cover art which ignores (pre-signed URL) query See merge request funkwhale/funkwhale-android!288pipelines/28335
Ryan Harg
2 years ago
19 changed files with 604 additions and 83 deletions
@ -0,0 +1,89 @@
@@ -0,0 +1,89 @@
|
||||
package audio.funkwhale.ffa.utils |
||||
|
||||
import java.lang.ref.WeakReference |
||||
import java.util.WeakHashMap |
||||
import java.util.concurrent.ConcurrentHashMap |
||||
|
||||
/** |
||||
* Similar to a Map, but with the semantic that operations single-thread on a per-key basis. |
||||
* That is: given concurrent accesses to keys "apple" and "banana", one "apple" thread |
||||
* will block all other "apple" threads, but not any "banana" threads. |
||||
* In practical terms, we use this to make sure we don't get weird edge cases when working |
||||
* with the filesystem cache. |
||||
*/ |
||||
class Bottleneck<T> { |
||||
// It would be nice to use LruCache here, but its behavior of |
||||
// replacing values doesn't get us the right results. |
||||
// As it is, this should be a trivial amount of memory compared to |
||||
// images and media. |
||||
// We single-thread this, so it doesn't need to be concurrent. |
||||
private val keys = WeakHashMap<String, String>() |
||||
|
||||
// This one needs to be concurrent, as we don't want to single-thread it. |
||||
private val values = ConcurrentHashMap<String, WeakReference<T>>() |
||||
|
||||
/** |
||||
* As you would expect from the Map function of the same name, except concurrent |
||||
* accesses to the same key will block on each other. If the first call succeeds, |
||||
* all other calls will fall through with the same result. (Unlike LRUCache.) |
||||
*/ |
||||
fun getOrCompute(key: String, materialize: (key: String) -> T?): T? { |
||||
// First, get the lockable version of the key, no matter how |
||||
// many copies of the key exist. |
||||
// This map doesn't need to be a synchronized collection, because |
||||
// we single-thread access to it. (And there's no compute, so |
||||
// it should be low-contention.) |
||||
val sharedKey: String = canonical(key) |
||||
synchronized(sharedKey) { |
||||
val ref = values[sharedKey] |
||||
var value = ref?.get() |
||||
if (value == null) { |
||||
if (ref != null) { |
||||
values.remove(sharedKey) // empty ref |
||||
} |
||||
value = materialize(sharedKey) |
||||
if (value != null) { |
||||
values[sharedKey] = WeakReference(value) |
||||
} |
||||
} |
||||
return value |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* The beating heart of this system: each key is is "upgraded" to |
||||
* the one which we use for locking. This does mean we block on |
||||
* access to `keys` for all concurrent access, but as it's so light- |
||||
* weight, this shouldn't be much of a problem in practical terms. |
||||
* The hope here is that this is slightly better than interning. |
||||
* In theory we could convert this over to also use WeakReference. |
||||
*/ |
||||
private fun canonical(key: String): String { |
||||
val sharedKey: String |
||||
synchronized(keys) { |
||||
val maybeShared = keys[key] |
||||
if (maybeShared == null) { |
||||
keys[key] = key // first key of its value becomes canonical |
||||
sharedKey = key |
||||
} else { |
||||
sharedKey = maybeShared |
||||
} |
||||
} |
||||
return sharedKey |
||||
} |
||||
|
||||
/** |
||||
* Invalidate a key and run the supplied bi-consumer with the old value. |
||||
* Note that this will <em>always</em> run the supplied block, even if |
||||
* the value is not in the cache. |
||||
*/ |
||||
fun remove(key: String, andDo: ((T?, String) -> Unit)?) { |
||||
val sharedKey = canonical(key) |
||||
synchronized(sharedKey) { |
||||
val oldValue = values.remove(sharedKey) |
||||
if (andDo != null) { |
||||
andDo(oldValue?.get(), sharedKey) |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,260 @@
@@ -0,0 +1,260 @@
|
||||
package audio.funkwhale.ffa.utils |
||||
|
||||
import android.content.Context |
||||
import android.net.Uri |
||||
import android.util.Log |
||||
import audio.funkwhale.ffa.BuildConfig |
||||
import audio.funkwhale.ffa.R |
||||
import com.squareup.picasso.Downloader |
||||
import com.squareup.picasso.NetworkPolicy |
||||
import com.squareup.picasso.OkHttp3Downloader |
||||
import com.squareup.picasso.Picasso |
||||
import com.squareup.picasso.Picasso.LoadedFrom |
||||
import com.squareup.picasso.Request |
||||
import com.squareup.picasso.RequestCreator |
||||
import com.squareup.picasso.RequestHandler |
||||
import okhttp3.CacheControl |
||||
import okhttp3.HttpUrl |
||||
import okhttp3.OkHttpClient |
||||
import okio.Okio |
||||
import java.io.File |
||||
import java.security.MessageDigest |
||||
|
||||
/** |
||||
* Represent bytes as hex values. |
||||
*/ |
||||
fun ByteArray.toHex(): String = joinToString("") { b -> "%02x".format(b) } |
||||
|
||||
/** |
||||
* Convert the string to its SHA-256 hash in hex format. |
||||
*/ |
||||
fun String.sha256(): String = |
||||
let { MessageDigest.getInstance("SHA-256").digest(it.encodeToByteArray()).toHex() } |
||||
|
||||
/** |
||||
* Remove the query string and fragment from a URI. |
||||
* Mostly, this is to get rid of pre-signed URL silliness. |
||||
* If we ever need to keep some query params, we'll need a more robust approach. |
||||
*/ |
||||
fun Uri.asStableKey(): String = buildUpon().clearQuery().fragment("").build().toString() |
||||
|
||||
/** |
||||
* Try to extract a file suffix from the URI. This isn't strictly |
||||
* necessary, but it can make debugging easier when you're going through |
||||
* the app cache with a filesystem browser. |
||||
*/ |
||||
fun Uri.fileSuffix(): String = let { |
||||
val p = it.path |
||||
val ext = p?.substringAfterLast(".", "")?.lowercase() ?: "" |
||||
if (ext == "") ext else ".$ext" |
||||
} |
||||
|
||||
/** |
||||
* Wrapper around Picasso with some smarter caching of image files. |
||||
*/ |
||||
open class CoverArt private constructor() { |
||||
companion object { |
||||
// For logging |
||||
val TAG: String = CoverArt::class.java.simpleName |
||||
|
||||
// This is just a nice-to-have for API admins |
||||
private const val userAgent = |
||||
"${BuildConfig.APPLICATION_ID} ${BuildConfig.VERSION_NAME} (${BuildConfig.VERSION_CODE})" |
||||
|
||||
// This client has the UA above, and has caching intentionally disabled. |
||||
// (Because we cache the images ourselves and cannot rely on replaying requests.) |
||||
private var httpClient: OkHttpClient? = null |
||||
|
||||
// Same: this has caching disabled. |
||||
private var downloader: OkHttp3Downloader? = null |
||||
|
||||
// Cache with some useful concurrency semantics. See its docs for details. |
||||
val fileCache = Bottleneck<File>() |
||||
|
||||
/** |
||||
* We don't need to hang onto the Context, just the Path it gets us. |
||||
*/ |
||||
fun cacheDirForContext(context: Context): File { |
||||
return context.applicationContext.cacheDir.resolve("covers") |
||||
} |
||||
|
||||
/** |
||||
* Shim for Picasso which acts like a NetworkRequestHandler, but is opinionated |
||||
* about how we want to use it. |
||||
*/ |
||||
open class CoverNetworkRequestHandler(context: Context) : RequestHandler() { |
||||
/** |
||||
* Path to the actual cache directory. |
||||
*/ |
||||
val coverCacheDir: File |
||||
|
||||
/** |
||||
* This goes out with every request and never changes. |
||||
*/ |
||||
val noCacheControl: CacheControl = CacheControl.Builder() |
||||
.noCache() |
||||
.noStore() |
||||
.noTransform() |
||||
.build() |
||||
|
||||
init { |
||||
coverCacheDir = cacheDirForContext(context) |
||||
// Make the cache directory if it doesn't already exist. |
||||
if (!coverCacheDir.isDirectory) { |
||||
coverCacheDir.mkdir() |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* The primary logic of going from a Request to a usable File. |
||||
* tl;dr: Use a local file if you can, otherwise download it and use that. |
||||
*/ |
||||
private fun materializeFile(request: Request): (String) -> File? { |
||||
return fun(fileName: String): File? { |
||||
val existing = coverCacheDir.resolve(fileName) |
||||
if (existing.isFile) { |
||||
return existing |
||||
} |
||||
val key = request.stableKey ?: request.uri.asStableKey() |
||||
val httpUrl = HttpUrl.parse(request.uri.toString()) ?: return null |
||||
return fetchToFile(httpUrl, fileName, key) |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Required by Picasso, we only want to handle HTTP traffic. |
||||
*/ |
||||
override fun canHandleRequest(data: Request?): Boolean { |
||||
return data != null && ("http" == data.uri.scheme || "https" == data.uri.scheme) |
||||
} |
||||
|
||||
/** |
||||
* Required by Picasso, this is the main entrypoint. |
||||
*/ |
||||
override fun load(request: Request?, networkPolicy: Int): Result? { |
||||
if (request == null || !NetworkPolicy.shouldReadFromDiskCache(networkPolicy)) { |
||||
return null |
||||
} |
||||
// Ditch any query params. |
||||
val key = request.stableKey ?: request.uri.asStableKey() |
||||
// Convert to a short, stable filename. |
||||
val fileName = |
||||
key.sha256() + request.uri.fileSuffix() // file extension for easier forensics |
||||
// Actually find or fetch the file. |
||||
val file = fileCache.getOrCompute(fileName, materializeFile(request)) |
||||
// Hand it back to Picasso in a way it can understand. |
||||
return if (file == null) null else Result(Okio.source(file), LoadedFrom.DISK) |
||||
} |
||||
|
||||
/** |
||||
* The actual fetch logic is straightforward: download to a file. |
||||
* Sadly, this is more manual than you might expect. |
||||
*/ |
||||
private fun fetchToFile(httpUrl: HttpUrl, fileName: String, cacheKey: String): File? { |
||||
val httpRequest = okhttp3.Request.Builder() |
||||
.get() |
||||
.url(httpUrl) |
||||
.cacheControl(noCacheControl) |
||||
.build() |
||||
val response = nonCachingDownloader().load(httpRequest) |
||||
if (!response.isSuccessful) { |
||||
return null |
||||
} |
||||
val body = response.body() ?: return null |
||||
val file = coverCacheDir.resolve(fileName) |
||||
if (BuildConfig.DEBUG) { |
||||
Log.d(TAG, "fetchToFile($cacheKey) <- $fileName <- NETWORK") |
||||
} |
||||
val bytesWritten: Long |
||||
body.use { b -> |
||||
Okio.buffer(Okio.sink(file)).use { sink -> |
||||
bytesWritten = sink.writeAll(b.source()) |
||||
} |
||||
} |
||||
return if (bytesWritten > 0) file else null |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Picasso can send back notification that files are busted. |
||||
* In those cases, it could be a transient problem, or credentials, etc. |
||||
* We probably don't want to trust the file, so we invalidate it |
||||
* from the memory cache and delete it from the filesystem. |
||||
* This uses Bottleneck, so it's thread-safe. |
||||
*/ |
||||
fun invalidateIn(context: Context): (Picasso, Uri, Exception) -> Unit { |
||||
val coverCacheDir = cacheDirForContext(context) |
||||
return fun(_, uri: Uri, _) { |
||||
val key = uri.asStableKey() |
||||
val fileName = key.sha256() + uri.fileSuffix() |
||||
fileCache.remove(fileName) { f, _ -> |
||||
val file = f ?: coverCacheDir.resolve(fileName) |
||||
if (file.isFile) { |
||||
if (BuildConfig.DEBUG) { |
||||
Log.d(TAG, "Deleting failed cover: $file") |
||||
} |
||||
file.delete() |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Low-level Picasso wiring. |
||||
*/ |
||||
private fun buildPicasso(context: Context) = Picasso.Builder(context) |
||||
// The bulk of the work happens here |
||||
.addRequestHandler(CoverNetworkRequestHandler(context)) |
||||
// Be careful with this. There's at least one place in Picasso where it |
||||
// doesn't null-check when logging, so it'll throw errors in places you |
||||
// wouldn't get them with logging turned off. /sigh |
||||
.loggingEnabled(false) // (BuildConfig.DEBUG) |
||||
// Occasionally, we may get transient HTTP issues, or bogus files. |
||||
// Listen for Picasso errors and invalidate those files |
||||
.listener(invalidateIn(context)) |
||||
.build() |
||||
|
||||
/** |
||||
* We don't want to cache the HTTP part of the flow, because: |
||||
* 1. It's double-caching, since we're saving the images already. |
||||
* 2. The URL may include pre-signed credentials, which expire, making the URL useless. |
||||
*/ |
||||
protected fun nonCachingDownloader(): Downloader { |
||||
val downloader = this.downloader ?: OkHttp3Downloader(nonCachingHttpClient()) |
||||
if (this.downloader == null) { |
||||
this.downloader = downloader |
||||
} |
||||
return downloader |
||||
} |
||||
|
||||
/** |
||||
* Same here: build a non-caching version just for cover art. |
||||
*/ |
||||
protected fun nonCachingHttpClient(): OkHttpClient { |
||||
val hc = httpClient ?: OkHttpClient.Builder() |
||||
.addInterceptor { chain -> |
||||
chain.proceed( |
||||
chain.request() |
||||
.newBuilder() |
||||
.addHeader("User-Agent", userAgent) |
||||
.build() |
||||
) |
||||
} |
||||
.cache(null) // No cache here, intentionally |
||||
.build() |
||||
if (httpClient == null) { |
||||
httpClient = hc |
||||
} |
||||
return hc |
||||
} |
||||
|
||||
/** |
||||
* The primary entrypoint for the codebase. |
||||
*/ |
||||
fun withContext(context: Context, url: String?): RequestCreator { |
||||
return buildPicasso(context) |
||||
.load(url) |
||||
.placeholder(R.drawable.cover) |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,207 @@
@@ -0,0 +1,207 @@
|
||||
package audio.funkwhale.ffa.utils |
||||
|
||||
import org.junit.Test |
||||
import strikt.api.expectThat |
||||
import strikt.assertions.isEqualTo |
||||
import strikt.assertions.isFalse |
||||
import strikt.assertions.isNotSameInstanceAs |
||||
import strikt.assertions.isNull |
||||
import strikt.assertions.isSameInstanceAs |
||||
import strikt.assertions.isTrue |
||||
import java.util.concurrent.ArrayBlockingQueue |
||||
import java.util.concurrent.ConcurrentLinkedDeque |
||||
import java.util.concurrent.ThreadPoolExecutor |
||||
import java.util.concurrent.TimeUnit |
||||
import java.util.concurrent.atomic.AtomicBoolean |
||||
import java.util.concurrent.atomic.AtomicInteger |
||||
|
||||
class BottleneckTest { |
||||
|
||||
@Test |
||||
fun `single threaded cache works like a cache`() { |
||||
var callCount = 0 |
||||
val cache = Bottleneck<Int>() |
||||
val materialize = { k: String -> |
||||
callCount++ |
||||
k.toInt() |
||||
} |
||||
val key = "34" |
||||
val keyCopy = String(key.encodeToByteArray().copyOf()) |
||||
expectThat(keyCopy).isEqualTo(key) |
||||
expectThat(keyCopy).isNotSameInstanceAs(key) |
||||
expectThat(callCount).isEqualTo(0) |
||||
val first = cache.getOrCompute(key, materialize) |
||||
expectThat(first).isEqualTo(34) |
||||
expectThat(callCount).isEqualTo(1) |
||||
val second = cache.getOrCompute(keyCopy, materialize) |
||||
expectThat(second).isEqualTo(34) |
||||
expectThat(second).isSameInstanceAs(first) |
||||
expectThat(callCount).isEqualTo(1) |
||||
} |
||||
|
||||
@Test |
||||
fun `multi-threaded cache only lets one through for each key at a time`() { |
||||
val maxThreads = 8 |
||||
val executor = ThreadPoolExecutor( |
||||
maxThreads, |
||||
maxThreads, |
||||
5, |
||||
TimeUnit.SECONDS, |
||||
ArrayBlockingQueue(maxThreads) |
||||
) |
||||
val running = AtomicBoolean(false) |
||||
val computeCount = AtomicInteger(0) |
||||
val key = "43" |
||||
val materialize = { k: String -> |
||||
expectThat(running.getAndSet(true)).isFalse() |
||||
expectThat(computeCount.incrementAndGet()).isEqualTo(1) |
||||
Thread.sleep(3000) |
||||
expectThat(running.getAndSet(false)).isTrue() |
||||
expectThat(computeCount.get()).isEqualTo(1) |
||||
k.toInt() |
||||
} |
||||
val cache = Bottleneck<Int>() |
||||
val threadCount = AtomicInteger(0) |
||||
for (c in 1..maxThreads) { |
||||
executor.execute { |
||||
Thread.currentThread().name = "test-thread-$c" |
||||
val keyCopy = String(key.encodeToByteArray().copyOf()) |
||||
expectThat(cache.getOrCompute(keyCopy, materialize)).isEqualTo(43) |
||||
threadCount.incrementAndGet() |
||||
} |
||||
} |
||||
executor.shutdown() |
||||
executor.awaitTermination(5, TimeUnit.SECONDS) |
||||
expectThat(threadCount.get()).isEqualTo(maxThreads) |
||||
} |
||||
|
||||
@Test |
||||
fun `single-threaded remove does what you would expect`() { |
||||
val cache = Bottleneck<Int>() |
||||
val materialize = { k: String -> k.toInt() } |
||||
val key = "24" |
||||
val first = cache.getOrCompute(key, materialize) |
||||
expectThat(first).isEqualTo(24) |
||||
var callCount = 0 |
||||
val keyCopy = String(key.encodeToByteArray().copyOf()) |
||||
expectThat(keyCopy).isEqualTo(key) |
||||
expectThat(keyCopy).isNotSameInstanceAs(key) |
||||
cache.remove(keyCopy) { value, k -> |
||||
expectThat(value).isSameInstanceAs(first) |
||||
expectThat(k).isSameInstanceAs(key) |
||||
callCount++ |
||||
} |
||||
expectThat(callCount).isEqualTo(1) |
||||
cache.remove(keyCopy) { value, k -> |
||||
expectThat(value).isNull() |
||||
expectThat(k).isSameInstanceAs(key) |
||||
callCount++ |
||||
} |
||||
expectThat(callCount).isEqualTo(2) |
||||
} |
||||
|
||||
@Test |
||||
fun `multi-threaded remove should synchronize and return correct results`() { |
||||
val maxThreads = 8 |
||||
val executor = ThreadPoolExecutor( |
||||
maxThreads, |
||||
maxThreads, |
||||
5, |
||||
TimeUnit.SECONDS, |
||||
ArrayBlockingQueue(maxThreads) |
||||
) |
||||
val running = AtomicBoolean(false) |
||||
val computeCount = AtomicInteger(0) |
||||
val key = "17" |
||||
val dematerialize: (Int?, String) -> Unit = { value: Int?, k: String -> |
||||
expectThat(running.getAndSet(true)).isFalse() |
||||
if (computeCount.incrementAndGet() == 1) { |
||||
expectThat(value).isEqualTo(17) |
||||
Thread.sleep(3000) |
||||
expectThat(computeCount.get()).isEqualTo(1) // no one else gets through until I'm done |
||||
} else { |
||||
expectThat(value).isNull() |
||||
} |
||||
expectThat(running.getAndSet(false)).isTrue() |
||||
k.toInt() |
||||
} |
||||
val cache = Bottleneck<Int>() |
||||
cache.getOrCompute(key) { k -> k.toInt() } |
||||
val threadCount = AtomicInteger(0) |
||||
for (c in 1..maxThreads) { |
||||
executor.execute { |
||||
Thread.currentThread().name = "test-thread-$c" |
||||
val keyCopy = String(key.encodeToByteArray().copyOf()) |
||||
cache.remove(keyCopy, dematerialize) |
||||
threadCount.incrementAndGet() |
||||
} |
||||
} |
||||
executor.shutdown() |
||||
executor.awaitTermination(5, TimeUnit.SECONDS) |
||||
expectThat(threadCount.get()).isEqualTo(maxThreads) |
||||
} |
||||
|
||||
@Test |
||||
fun `blocking happens on a per-key basis`() { |
||||
val cache = Bottleneck<Int>() |
||||
val maxThreads = 4 |
||||
val executor = ThreadPoolExecutor( |
||||
maxThreads, |
||||
maxThreads, |
||||
5, |
||||
TimeUnit.SECONDS, |
||||
ArrayBlockingQueue(maxThreads) |
||||
) |
||||
val running: Map<String, AtomicBoolean> = mapOf( |
||||
Pair("tortoise", AtomicBoolean(false)), |
||||
Pair("hare", AtomicBoolean(false)), |
||||
) |
||||
val count: Map<String, AtomicInteger> = mapOf( |
||||
Pair("tortoise", AtomicInteger(0)), |
||||
Pair("hare", AtomicInteger(0)), |
||||
) |
||||
val race = ConcurrentLinkedDeque<String>() |
||||
val threadCount = AtomicInteger(0) |
||||
for (key in arrayListOf("tortoise", "hare")) { |
||||
for (n in 1..2) { |
||||
executor.execute { |
||||
try { |
||||
cache.getOrCompute(String(key.encodeToByteArray().copyOf())) { k -> |
||||
val num = count[key]?.incrementAndGet() ?: -1 |
||||
Thread.currentThread().name = "$key-$num" |
||||
threadCount.incrementAndGet() |
||||
if (key == "hare") { |
||||
Thread.sleep(250) // give tortoise a chance to start |
||||
} |
||||
race.add("$key $num started") |
||||
expectThat(running[key]?.getAndSet(true)).isFalse() |
||||
if (num == 1) { |
||||
Thread.sleep(if (key == "tortoise") 3000 else 1000) |
||||
} |
||||
expectThat(running[key]?.getAndSet(false)).isTrue() |
||||
race.add("$key $num finished") |
||||
null |
||||
} |
||||
} catch (e: Throwable) { |
||||
race.add("Thread $key failed: ${e.message}") |
||||
} |
||||
} |
||||
} |
||||
} |
||||
executor.shutdown() |
||||
executor.awaitTermination(5, TimeUnit.SECONDS) |
||||
expectThat(threadCount.get()).isEqualTo(maxThreads) |
||||
expectThat(race.joinToString("\n")).isEqualTo( |
||||
""" |
||||
tortoise 1 started |
||||
hare 1 started |
||||
hare 1 finished |
||||
hare 2 started |
||||
hare 2 finished |
||||
tortoise 1 finished |
||||
tortoise 2 started |
||||
tortoise 2 finished |
||||
""".trimIndent() |
||||
) |
||||
} |
||||
} |
Loading…
Reference in new issue