From e2aa36d083c961048bd652e83280799e50ed55fc Mon Sep 17 00:00:00 2001 From: kapodamy Date: Wed, 5 Dec 2018 01:03:56 -0300 Subject: [PATCH] fast download pausing * fast download pausing * fix UI thread blocking when calling pause() * check running threads before start the download * fix null pointer exception in onDestroy in the download service, without calling onCreate method (android 8) --- .../giga/get/DownloadInitializer.java | 106 ++++++++++------ .../us/shandian/giga/get/DownloadMission.java | 117 +++++++++++++----- .../shandian/giga/get/DownloadRunnable.java | 72 +++++++---- .../giga/get/DownloadRunnableFallback.java | 62 +++++++--- .../giga/service/DownloadManagerService.java | 2 +- 5 files changed, 241 insertions(+), 118 deletions(-) diff --git a/app/src/main/java/us/shandian/giga/get/DownloadInitializer.java b/app/src/main/java/us/shandian/giga/get/DownloadInitializer.java index 298e7be37..ce7ae267c 100644 --- a/app/src/main/java/us/shandian/giga/get/DownloadInitializer.java +++ b/app/src/main/java/us/shandian/giga/get/DownloadInitializer.java @@ -5,6 +5,7 @@ import android.util.Log; import java.io.File; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.nio.channels.ClosedByInterruptException; @@ -13,14 +14,16 @@ import us.shandian.giga.util.Utility; import static org.schabi.newpipe.BuildConfig.DEBUG; -public class DownloadInitializer implements Runnable { +public class DownloadInitializer extends Thread { private final static String TAG = "DownloadInitializer"; final static int mId = 0; private DownloadMission mMission; + private HttpURLConnection mConn; DownloadInitializer(@NonNull DownloadMission mission) { mMission = mission; + mConn = null; } @Override @@ -32,10 +35,12 @@ public class DownloadInitializer implements Runnable { try { mMission.currentThreadCount = mMission.threadCount; - HttpURLConnection conn = mMission.openConnection(mId, -1, -1); + mConn = mMission.openConnection(mId, -1, -1); + mMission.establishConnection(mId, mConn); + if (!mMission.running || Thread.interrupted()) return; - mMission.length = Utility.getContentLength(conn); + mMission.length = Utility.getContentLength(mConn); if (mMission.length == 0) { @@ -44,7 +49,7 @@ public class DownloadInitializer implements Runnable { } // check for dynamic generated content - if (mMission.length == -1 && conn.getResponseCode() == 200) { + if (mMission.length == -1 && mConn.getResponseCode() == 200) { mMission.blocks = 0; mMission.length = 0; mMission.fallback = true; @@ -56,50 +61,54 @@ public class DownloadInitializer implements Runnable { } } else { // Open again - conn = mMission.openConnection(mId, mMission.length - 10, mMission.length); + mConn = mMission.openConnection(mId, mMission.length - 10, mMission.length); + mMission.establishConnection(mId, mConn); - int code = conn.getResponseCode(); if (!mMission.running || Thread.interrupted()) return; - if (code == 206) { - if (mMission.currentThreadCount > 1) { - mMission.blocks = mMission.length / DownloadMission.BLOCK_SIZE; + synchronized (mMission.blockState) { + if (mConn.getResponseCode() == 206) { + if (mMission.currentThreadCount > 1) { + mMission.blocks = mMission.length / DownloadMission.BLOCK_SIZE; - if (mMission.currentThreadCount > mMission.blocks) { - mMission.currentThreadCount = (int) mMission.blocks; + if (mMission.currentThreadCount > mMission.blocks) { + mMission.currentThreadCount = (int) mMission.blocks; + } + if (mMission.currentThreadCount <= 0) { + mMission.currentThreadCount = 1; + } + if (mMission.blocks * DownloadMission.BLOCK_SIZE < mMission.length) { + mMission.blocks++; + } + } else { + // if one thread is solicited don't calculate blocks, is useless + mMission.blocks = 1; + mMission.fallback = true; + mMission.unknownLength = false; } - if (mMission.currentThreadCount <= 0) { - mMission.currentThreadCount = 1; - } - if (mMission.blocks * DownloadMission.BLOCK_SIZE < mMission.length) { - mMission.blocks++; + + if (DEBUG) { + Log.d(TAG, "http response code = " + mConn.getResponseCode()); } } else { - // if one thread is solicited don't calculate blocks, is useless - mMission.blocks = 1; + // Fallback to single thread + mMission.blocks = 0; mMission.fallback = true; mMission.unknownLength = false; + mMission.currentThreadCount = 1; + + if (DEBUG) { + Log.d(TAG, "falling back due http response code = " + mConn.getResponseCode()); + } } - if (DEBUG) { - Log.d(TAG, "http response code = " + code); - } - } else { - // Fallback to single thread - mMission.blocks = 0; - mMission.fallback = true; - mMission.unknownLength = false; - mMission.currentThreadCount = 1; - - if (DEBUG) { - Log.d(TAG, "falling back due http response code = " + code); + for (long i = 0; i < mMission.currentThreadCount; i++) { + mMission.threadBlockPositions.add(i); + mMission.threadBytePositions.add(0L); } } - } - for (long i = 0; i < mMission.currentThreadCount; i++) { - mMission.threadBlockPositions.add(i); - mMission.threadBytePositions.add(0L); + if (!mMission.running || Thread.interrupted()) return; } File file; @@ -112,7 +121,7 @@ public class DownloadInitializer implements Runnable { file = new File(file, mMission.name); - // if the name is used by "something", delete it + // if the name is used by another process, delete it if (file.exists() && !file.isFile() && !file.delete()) { mMission.notifyError(DownloadMission.ERROR_FILE_CREATION, null); return; @@ -131,14 +140,16 @@ public class DownloadInitializer implements Runnable { af.seek(mMission.offsets[mMission.current]); af.close(); - if (Thread.interrupted()) return; + if (!mMission.running || Thread.interrupted()) return; mMission.running = false; break; + } catch (InterruptedIOException | ClosedByInterruptException e) { + return; } catch (Exception e) { - if (e instanceof ClosedByInterruptException) { - return; - } else if (e instanceof IOException && e.getMessage().contains("Permission denied")) { + if (!mMission.running) return; + + if (e instanceof IOException && e.getMessage().contains("Permission denied")) { mMission.notifyError(DownloadMission.ERROR_PERMISSION_DENIED, e); return; } @@ -150,11 +161,26 @@ public class DownloadInitializer implements Runnable { return; } - //try again Log.e(TAG, "initializer failed, retrying", e); } } + // hide marquee in the progress bar + mMission.done++; + mMission.start(); } + + @Override + public void interrupt() { + super.interrupt(); + + if (mConn != null) { + try { + mConn.disconnect(); + } catch (Exception e) { + // nothing to do + } + } + } } diff --git a/app/src/main/java/us/shandian/giga/get/DownloadMission.java b/app/src/main/java/us/shandian/giga/get/DownloadMission.java index f3a817ba8..c25d517f1 100644 --- a/app/src/main/java/us/shandian/giga/get/DownloadMission.java +++ b/app/src/main/java/us/shandian/giga/get/DownloadMission.java @@ -122,13 +122,13 @@ public class DownloadMission extends Mission { private transient boolean mWritingToFile; @SuppressWarnings("UseSparseArrays")// LongSparseArray is not serializable - private final HashMap blockState = new HashMap<>(); + final HashMap blockState = new HashMap<>(); final List threadBlockPositions = new ArrayList<>(); final List threadBytePositions = new ArrayList<>(); private transient boolean deleted; int currentThreadCount; - private transient Thread[] threads = null; + private transient Thread[] threads = new Thread[0]; private transient Thread init = null; @@ -238,9 +238,8 @@ public class DownloadMission extends Mission { * @param rangeEnd range end * @return a {@link java.net.URLConnection URLConnection} linking to the URL. * @throws IOException if an I/O exception occurs. - * @throws HttpError if the the http response is not satisfiable */ - HttpURLConnection openConnection(int threadId, long rangeStart, long rangeEnd) throws IOException, HttpError { + HttpURLConnection openConnection(int threadId, long rangeStart, long rangeEnd) throws IOException { URL url = new URL(urls[current]); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(true); @@ -250,29 +249,45 @@ public class DownloadMission extends Mission { if (rangeEnd > 0) req += rangeEnd; conn.setRequestProperty("Range", req); + if (DEBUG) { Log.d(TAG, threadId + ":" + conn.getRequestProperty("Range")); - Log.d(TAG, threadId + ":Content-Length=" + conn.getContentLength() + " Code:" + conn.getResponseCode()); } } - conn.connect(); + return conn; + } + /** + * @param threadId id of the calling thread + * @param conn Opens and establish the communication + * @throws IOException if an error occurred connecting to the server. + * @throws HttpError if the HTTP Status-Code is not satisfiable + */ + void establishConnection(int threadId, HttpURLConnection conn) throws IOException, HttpError { + conn.connect(); int statusCode = conn.getResponseCode(); + + if (DEBUG) { + Log.d(TAG, threadId + ":Content-Length=" + conn.getContentLength() + " Code:" + statusCode); + } + switch (statusCode) { case 204: case 205: case 207: throw new HttpError(conn.getResponseCode()); + case 416: + return;// let the download thread handle this error default: if (statusCode < 200 || statusCode > 299) { throw new HttpError(statusCode); } } - return conn; } + private void notify(int what) { Message m = new Message(); m.what = what; @@ -389,6 +404,11 @@ public class DownloadMission extends Mission { */ public void start() { if (running || current >= urls.length) return; + + // ensure that the previous state is completely paused. + joinForThread(init); + for (Thread thread : threads) joinForThread(thread); + enqueued = false; running = true; errCode = ERROR_NOTHING; @@ -400,7 +420,7 @@ public class DownloadMission extends Mission { init = null; - if (threads == null) { + if (threads.length < 1) { threads = new Thread[currentThreadCount]; } @@ -428,39 +448,37 @@ public class DownloadMission extends Mission { recovered = true; enqueued = false; - if (init != null && init != Thread.currentThread() && init.isAlive()) { - init.interrupt(); - - try { - init.join(); - } catch (InterruptedException e) { - // nothing to do + if (postprocessingRunning) { + if (DEBUG) { + Log.w(TAG, "pause during post-processing is not applicable."); } + return; + } - resetState(); + if (init != null && init.isAlive()) { + init.interrupt(); + synchronized (blockState) { + resetState(); + } return; } if (DEBUG && blocks == 0) { - Log.w(TAG, "pausing a download that can not be resumed."); + Log.w(TAG, "pausing a download that can not be resumed (range requests not allowed by the server)."); } - if (threads == null || Thread.interrupted()) { + if (threads == null || Thread.currentThread().isInterrupted()) { writeThisToFile(); return; } - if (postprocessingRunning) return; - // wait for all threads are suspended before save the state runAsync(-1, () -> { try { for (Thread thread : threads) { - if (thread == Thread.currentThread()) continue; - if (thread.isAlive()) { thread.interrupt(); - thread.join(); + thread.join(5000); } } } catch (Exception e) { @@ -492,7 +510,7 @@ public class DownloadMission extends Mission { threadBlockPositions.clear(); threadBytePositions.clear(); blockState.clear(); - threads = null; + threads = new Thread[0]; Utility.writeToFile(metadata, DownloadMission.this); } @@ -571,28 +589,61 @@ public class DownloadMission extends Mission { } /** - * run a method in a new thread + * run a new thread * * @param id id of new thread (used for debugging only) - * @param who the object whose {@code run} method is invoked when this thread is started - * @return the created thread + * @param who the Runnable whose {@code run} method is invoked. */ - private Thread runAsync(int id, Runnable who) { + private void runAsync(int id, Runnable who) { + runAsync(id, new Thread(who)); + } + + /** + * run a new thread + * + * @param id id of new thread (used for debugging only) + * @param who the Thread whose {@code run} method is invoked when this thread is started + * @return the passed thread + */ + private Thread runAsync(int id, Thread who) { // known thread ids: // -2: state saving by notifyProgress() method // -1: wait for saving the state by pause() method // 0: initializer // >=1: any download thread - Thread thread = new Thread(who); if (DEBUG) { - thread.setName(String.format("[%s] id = %s filename = %s", TAG, id, name)); + who.setName(String.format("%s[%s] %s", TAG, id, name)); } - thread.start(); - return thread; + who.start(); + + return who; } + private void joinForThread(Thread thread) { + if (thread == null || !thread.isAlive()) return; + if (thread == Thread.currentThread()) return; + + if (DEBUG) { + Log.w(TAG, "a thread is !still alive!: " + thread.getName()); + } + + // still alive, this should not happen. + // Possible reasons: + // slow device + // the user is spamming start/pause buttons + // start() method called quickly after pause() + + try { + thread.join(10000); + } catch (InterruptedException e) { + Log.d(TAG, "timeout on join : " + thread.getName()); + throw new RuntimeException("A thread is still running:\n" + thread.getName()); + } + } + + static class HttpError extends Exception { int statusCode; @@ -602,7 +653,7 @@ public class DownloadMission extends Mission { @Override public String getMessage() { - return "Http status code: " + String.valueOf(statusCode); + return "HTTP " + String.valueOf(statusCode); } } } diff --git a/app/src/main/java/us/shandian/giga/get/DownloadRunnable.java b/app/src/main/java/us/shandian/giga/get/DownloadRunnable.java index 336bc13ee..244fbd47a 100644 --- a/app/src/main/java/us/shandian/giga/get/DownloadRunnable.java +++ b/app/src/main/java/us/shandian/giga/get/DownloadRunnable.java @@ -14,16 +14,19 @@ import static org.schabi.newpipe.BuildConfig.DEBUG; * Runnable to download blocks of a file until the file is completely downloaded, * an error occurs or the process is stopped. */ -public class DownloadRunnable implements Runnable { +public class DownloadRunnable extends Thread { private static final String TAG = DownloadRunnable.class.getSimpleName(); private final DownloadMission mMission; private final int mId; + private HttpURLConnection mConn; + DownloadRunnable(DownloadMission mission, int id) { if (mission == null) throw new NullPointerException("mission is null"); mMission = mission; mId = id; + mConn = null; } @Override @@ -47,12 +50,7 @@ public class DownloadRunnable implements Runnable { return; } - while (mMission.errCode == DownloadMission.ERROR_NOTHING && mMission.running && blockPosition < mMission.blocks) { - - if (Thread.currentThread().isInterrupted()) { - mMission.pause(); - return; - } + while (mMission.running && mMission.errCode == DownloadMission.ERROR_NOTHING && blockPosition < mMission.blocks) { if (DEBUG && retry) { Log.d(TAG, mId + ":retry is true. Resuming at " + blockPosition); @@ -83,8 +81,9 @@ public class DownloadRunnable implements Runnable { long start = blockPosition * DownloadMission.BLOCK_SIZE; long end = start + DownloadMission.BLOCK_SIZE - 1; + long offset = mMission.getThreadBytePosition(mId); - start += mMission.getThreadBytePosition(mId); + start += offset; if (end >= mMission.length) { end = mMission.length - 1; @@ -93,14 +92,21 @@ public class DownloadRunnable implements Runnable { long total = 0; try { - HttpURLConnection conn = mMission.openConnection(mId, start, end); + mConn = mMission.openConnection(mId, start, end); + mMission.establishConnection(mId, mConn); + + // check if the download can be resumed + if (mConn.getResponseCode() == 416 && offset > 0) { + retryCount--; + throw new DownloadMission.HttpError(416); + } // The server may be ignoring the range request - if (conn.getResponseCode() != 206) { - mMission.notifyError(new DownloadMission.HttpError(conn.getResponseCode())); + if (mConn.getResponseCode() != 206) { + mMission.notifyError(new DownloadMission.HttpError(mConn.getResponseCode())); if (DEBUG) { - Log.e(TAG, mId + ":Unsupported " + conn.getResponseCode()); + Log.e(TAG, mId + ":Unsupported " + mConn.getResponseCode()); } break; @@ -108,7 +114,8 @@ public class DownloadRunnable implements Runnable { f.seek(mMission.offsets[mMission.current] + start); - is = conn.getInputStream(); + is = mConn.getInputStream(); + byte[] buf = new byte[DownloadMission.BUFFER_SIZE]; int len; @@ -121,18 +128,17 @@ public class DownloadRunnable implements Runnable { if (DEBUG && mMission.running) { Log.d(TAG, mId + ":position " + blockPosition + " finished, " + total + " bytes downloaded"); - mMission.setThreadBytePosition(mId, 0L); } - // if the download is paused, save progress for this thread - if (!mMission.running) { - mMission.setThreadBytePosition(mId, total); - break; - } + if (mMission.running) + mMission.setThreadBytePosition(mId, 0L);// clear byte position for next block + else + mMission.setThreadBytePosition(mId, total);// download paused, save progress for this block + } catch (Exception e) { mMission.setThreadBytePosition(mId, total); - if (e instanceof ClosedByInterruptException) break; + if (!mMission.running || e instanceof ClosedByInterruptException) break; if (retryCount++ >= mMission.maxRetry) { mMission.notifyError(e); @@ -147,29 +153,43 @@ public class DownloadRunnable implements Runnable { } } - try { - f.close(); - } catch (Exception err) { - // ¿ejected media storage? ¿file deleted? ¿storage ran out of space? - } - try { if (is != null) is.close(); } catch (Exception err) { // nothing to do } + try { + f.close(); + } catch (Exception err) { + // ¿ejected media storage? ¿file deleted? ¿storage ran out of space? + } + if (DEBUG) { Log.d(TAG, "thread " + mId + " exited from main download loop"); } + if (mMission.errCode == DownloadMission.ERROR_NOTHING && mMission.running) { if (DEBUG) { Log.d(TAG, "no error has happened, notifying"); } mMission.notifyFinished(); } + if (DEBUG && !mMission.running) { Log.d(TAG, "The mission has been paused. Passing."); } } + + @Override + public void interrupt() { + super.interrupt(); + + try { + if (mConn != null) mConn.disconnect(); + } catch (Exception e) { + // nothing to do + } + } + } diff --git a/app/src/main/java/us/shandian/giga/get/DownloadRunnableFallback.java b/app/src/main/java/us/shandian/giga/get/DownloadRunnableFallback.java index 5ef4ed90e..4bcaeaf85 100644 --- a/app/src/main/java/us/shandian/giga/get/DownloadRunnableFallback.java +++ b/app/src/main/java/us/shandian/giga/get/DownloadRunnableFallback.java @@ -18,30 +18,33 @@ import static org.schabi.newpipe.BuildConfig.DEBUG; /** * Single-threaded fallback mode */ -public class DownloadRunnableFallback implements Runnable { +public class DownloadRunnableFallback extends Thread { private static final String TAG = "DownloadRunnableFallback"; private final DownloadMission mMission; - private int retryCount = 0; + private final int mId = 1; - private InputStream is; - private RandomAccessFile f; + private int mRetryCount = 0; + private InputStream mIs; + private RandomAccessFile mF; + private HttpURLConnection mConn; DownloadRunnableFallback(@NonNull DownloadMission mission) { mMission = mission; - is = null; - f = null; + mIs = null; + mF = null; + mConn = null; } private void dispose() { try { - if (is != null) is.close(); + if (mIs != null) mIs.close(); } catch (IOException e) { // nothing to do } try { - if (f != null) f.close(); + if (mF != null) mF.close(); } catch (IOException e) { // ¿ejected media storage? ¿file deleted? ¿storage ran out of space? } @@ -63,27 +66,36 @@ public class DownloadRunnableFallback implements Runnable { try { long rangeStart = (mMission.unknownLength || start < 1) ? -1 : start; - HttpURLConnection conn = mMission.openConnection(1, rangeStart, -1); + + mConn = mMission.openConnection(mId, rangeStart, -1); + mMission.establishConnection(mId, mConn); + + // check if the download can be resumed + if (mConn.getResponseCode() == 416 && start > 0) { + start = 0; + mRetryCount--; + throw new DownloadMission.HttpError(416); + } // secondary check for the file length if (!mMission.unknownLength) - mMission.unknownLength = Utility.getContentLength(conn) == -1; + mMission.unknownLength = Utility.getContentLength(mConn) == -1; - f = new RandomAccessFile(mMission.getDownloadedFile(), "rw"); - f.seek(mMission.offsets[mMission.current] + start); + mF = new RandomAccessFile(mMission.getDownloadedFile(), "rw"); + mF.seek(mMission.offsets[mMission.current] + start); - is = conn.getInputStream(); + mIs = mConn.getInputStream(); byte[] buf = new byte[64 * 1024]; int len = 0; - while (mMission.running && (len = is.read(buf, 0, buf.length)) != -1) { - f.write(buf, 0, len); + while (mMission.running && (len = mIs.read(buf, 0, buf.length)) != -1) { + mF.write(buf, 0, len); start += len; mMission.notifyProgress(len); } - // if thread goes interrupted check if the last part is written. This avoid re-download the whole file + // if thread goes interrupted check if the last part mIs written. This avoid re-download the whole file done = len == -1; } catch (Exception e) { dispose(); @@ -91,9 +103,9 @@ public class DownloadRunnableFallback implements Runnable { // save position mMission.setThreadBytePosition(0, start); - if (e instanceof ClosedByInterruptException) return; + if (!mMission.running || e instanceof ClosedByInterruptException) return; - if (retryCount++ >= mMission.maxRetry) { + if (mRetryCount++ >= mMission.maxRetry) { mMission.notifyError(e); return; } @@ -110,4 +122,18 @@ public class DownloadRunnableFallback implements Runnable { mMission.setThreadBytePosition(0, start); } } + + @Override + public void interrupt() { + super.interrupt(); + + if (mConn != null) { + try { + mConn.disconnect(); + } catch (Exception e) { + // nothing to do + } + + } + } } diff --git a/app/src/main/java/us/shandian/giga/service/DownloadManagerService.java b/app/src/main/java/us/shandian/giga/service/DownloadManagerService.java index 1bb28fe95..a57fe1734 100755 --- a/app/src/main/java/us/shandian/giga/service/DownloadManagerService.java +++ b/app/src/main/java/us/shandian/giga/service/DownloadManagerService.java @@ -235,7 +235,7 @@ public class DownloadManagerService extends Service { if (icDownloadDone != null) icDownloadDone.recycle(); if (icDownloadFailed != null) icDownloadFailed.recycle(); - icLauncher.recycle(); + if (icLauncher != null) icLauncher.recycle(); } @Override