1
0
mirror of https://github.com/TeamNewPipe/NewPipe synced 2025-01-25 08:26:57 +00:00

Convert subscription import service to a worker

This commit is contained in:
Isira Seneviratne 2024-11-28 09:53:25 +05:30
parent affd64938b
commit 6046e9642b
12 changed files with 222 additions and 625 deletions

View File

@ -92,7 +92,6 @@
android:name="androidx.work.impl.foreground.SystemForegroundService"
android:foregroundServiceType="dataSync"
tools:node="merge" />
<service android:name=".local.subscription.services.SubscriptionsImportService" />
<service android:name=".local.feed.service.FeedLoadService" />
<activity

View File

@ -3,7 +3,6 @@ package org.schabi.newpipe.local.subscription;
import static org.schabi.newpipe.util.Localization.assureCorrectAppLanguage;
import android.app.Dialog;
import android.content.Intent;
import android.os.Bundle;
import androidx.annotation.NonNull;
@ -11,39 +10,72 @@ import androidx.annotation.Nullable;
import androidx.appcompat.app.AlertDialog;
import androidx.fragment.app.DialogFragment;
import androidx.fragment.app.Fragment;
import androidx.work.Constraints;
import androidx.work.Data;
import androidx.work.ExistingWorkPolicy;
import androidx.work.NetworkType;
import androidx.work.OneTimeWorkRequest;
import androidx.work.OutOfQuotaPolicy;
import androidx.work.WorkManager;
import com.evernote.android.state.State;
import com.livefront.bridge.Bridge;
import org.schabi.newpipe.R;
import org.schabi.newpipe.local.subscription.workers.SubscriptionImportWorker;
import org.schabi.newpipe.util.Constants;
public class ImportConfirmationDialog extends DialogFragment {
@State
protected Intent resultServiceIntent;
protected int mode;
@State
protected String value;
@State
protected int serviceId;
public static void show(@NonNull final Fragment fragment,
@NonNull final Intent resultServiceIntent) {
final ImportConfirmationDialog confirmationDialog = new ImportConfirmationDialog();
confirmationDialog.setResultServiceIntent(resultServiceIntent);
public static void show(@NonNull final Fragment fragment, final int mode,
@Nullable final String value, final int serviceId) {
final var confirmationDialog = new ImportConfirmationDialog();
confirmationDialog.setData(mode, value, serviceId);
confirmationDialog.show(fragment.getParentFragmentManager(), null);
}
public void setResultServiceIntent(final Intent resultServiceIntent) {
this.resultServiceIntent = resultServiceIntent;
@SuppressWarnings("HiddenField")
public void setData(final int mode, final String value, final int serviceId) {
this.mode = mode;
this.value = value;
this.serviceId = serviceId;
}
@NonNull
@Override
public Dialog onCreateDialog(@Nullable final Bundle savedInstanceState) {
assureCorrectAppLanguage(getContext());
return new AlertDialog.Builder(requireContext())
final var context = requireContext();
assureCorrectAppLanguage(context);
return new AlertDialog.Builder(context)
.setMessage(R.string.import_network_expensive_warning)
.setCancelable(true)
.setNegativeButton(R.string.cancel, null)
.setPositiveButton(R.string.ok, (dialogInterface, i) -> {
if (resultServiceIntent != null && getContext() != null) {
getContext().startService(resultServiceIntent);
}
final var inputData = new Data.Builder()
.putString(SubscriptionImportWorker.KEY_VALUE, value)
.putInt(SubscriptionImportWorker.KEY_MODE, mode)
.putInt(Constants.KEY_SERVICE_ID, serviceId)
.build();
final var constraints = new Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.build();
final var req = new OneTimeWorkRequest.Builder(SubscriptionImportWorker.class)
.setInputData(inputData)
.setExpedited(OutOfQuotaPolicy.RUN_AS_NON_EXPEDITED_WORK_REQUEST)
.setConstraints(constraints)
.build();
WorkManager.getInstance(context)
.enqueueUniqueWork(SubscriptionImportWorker.WORK_NAME,
ExistingWorkPolicy.APPEND_OR_REPLACE, req);
dismiss();
})
.create();
@ -53,8 +85,8 @@ public class ImportConfirmationDialog extends DialogFragment {
public void onCreate(@Nullable final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
if (resultServiceIntent == null) {
throw new IllegalStateException("Result intent is null");
if (mode == 0 && value == null && serviceId == 0) {
throw new IllegalStateException("Input data not provided");
}
Bridge.restoreInstanceState(this, savedInstanceState);

View File

@ -3,7 +3,6 @@ package org.schabi.newpipe.local.subscription
import android.app.Activity
import android.content.Context
import android.content.DialogInterface
import android.content.Intent
import android.os.Bundle
import android.os.Parcelable
import android.view.LayoutInflater
@ -49,14 +48,12 @@ import org.schabi.newpipe.local.subscription.item.FeedGroupCarouselItem
import org.schabi.newpipe.local.subscription.item.GroupsHeader
import org.schabi.newpipe.local.subscription.item.Header
import org.schabi.newpipe.local.subscription.item.ImportSubscriptionsHintPlaceholderItem
import org.schabi.newpipe.local.subscription.services.SubscriptionsImportService
import org.schabi.newpipe.local.subscription.services.SubscriptionsImportService.KEY_MODE
import org.schabi.newpipe.local.subscription.services.SubscriptionsImportService.KEY_VALUE
import org.schabi.newpipe.local.subscription.services.SubscriptionsImportService.PREVIOUS_EXPORT_MODE
import org.schabi.newpipe.local.subscription.workers.SubscriptionExportWorker
import org.schabi.newpipe.local.subscription.workers.SubscriptionImportWorker
import org.schabi.newpipe.streams.io.NoFileManagerSafeGuard
import org.schabi.newpipe.streams.io.StoredFileHelper
import org.schabi.newpipe.ui.emptystate.setEmptyStateComposable
import org.schabi.newpipe.util.NO_SERVICE_ID
import org.schabi.newpipe.util.NavigationHelper
import org.schabi.newpipe.util.OnClickGesture
import org.schabi.newpipe.util.ServiceHelper
@ -231,12 +228,10 @@ class SubscriptionFragment : BaseStateFragment<SubscriptionState>() {
}
private fun requestImportResult(result: ActivityResult) {
if (result.data != null && result.resultCode == Activity.RESULT_OK) {
val data = result.data?.dataString
if (data != null && result.resultCode == Activity.RESULT_OK) {
ImportConfirmationDialog.show(
this,
Intent(activity, SubscriptionsImportService::class.java)
.putExtra(KEY_MODE, PREVIOUS_EXPORT_MODE)
.putExtra(KEY_VALUE, result.data?.data)
this, SubscriptionImportWorker.PREVIOUS_EXPORT_MODE, data, NO_SERVICE_ID
)
}
}

View File

@ -1,10 +1,6 @@
package org.schabi.newpipe.local.subscription;
import static org.schabi.newpipe.extractor.subscription.SubscriptionExtractor.ContentSource.CHANNEL_URL;
import static org.schabi.newpipe.local.subscription.services.SubscriptionsImportService.CHANNEL_URL_MODE;
import static org.schabi.newpipe.local.subscription.services.SubscriptionsImportService.INPUT_STREAM_MODE;
import static org.schabi.newpipe.local.subscription.services.SubscriptionsImportService.KEY_MODE;
import static org.schabi.newpipe.local.subscription.services.SubscriptionsImportService.KEY_VALUE;
import android.app.Activity;
import android.content.Intent;
@ -37,7 +33,7 @@ import org.schabi.newpipe.error.UserAction;
import org.schabi.newpipe.extractor.NewPipe;
import org.schabi.newpipe.extractor.exceptions.ExtractionException;
import org.schabi.newpipe.extractor.subscription.SubscriptionExtractor;
import org.schabi.newpipe.local.subscription.services.SubscriptionsImportService;
import org.schabi.newpipe.local.subscription.workers.SubscriptionImportWorker;
import org.schabi.newpipe.streams.io.NoFileManagerSafeGuard;
import org.schabi.newpipe.streams.io.StoredFileHelper;
import org.schabi.newpipe.util.Constants;
@ -168,10 +164,8 @@ public class SubscriptionsImportFragment extends BaseFragment {
}
public void onImportUrl(final String value) {
ImportConfirmationDialog.show(this, new Intent(activity, SubscriptionsImportService.class)
.putExtra(KEY_MODE, CHANNEL_URL_MODE)
.putExtra(KEY_VALUE, value)
.putExtra(Constants.KEY_SERVICE_ID, currentServiceId));
ImportConfirmationDialog.show(this, SubscriptionImportWorker.CHANNEL_URL_MODE, value,
currentServiceId);
}
public void onImportFile() {
@ -186,16 +180,10 @@ public class SubscriptionsImportFragment extends BaseFragment {
}
private void requestImportFileResult(final ActivityResult result) {
if (result.getData() == null) {
return;
}
if (result.getResultCode() == Activity.RESULT_OK && result.getData().getData() != null) {
ImportConfirmationDialog.show(this,
new Intent(activity, SubscriptionsImportService.class)
.putExtra(KEY_MODE, INPUT_STREAM_MODE)
.putExtra(KEY_VALUE, result.getData().getData())
.putExtra(Constants.KEY_SERVICE_ID, currentServiceId));
final String data = result.getData() != null ? result.getData().getDataString() : null;
if (result.getResultCode() == Activity.RESULT_OK && data != null) {
ImportConfirmationDialog.show(this, SubscriptionImportWorker.INPUT_STREAM_MODE,
data, currentServiceId);
}
}

View File

@ -1,233 +0,0 @@
/*
* Copyright 2018 Mauricio Colli <mauriciocolli@outlook.com>
* BaseImportExportService.java is part of NewPipe
*
* License: GPL-3.0+
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.schabi.newpipe.local.subscription.services;
import android.app.Service;
import android.content.Intent;
import android.os.Build;
import android.os.IBinder;
import android.text.TextUtils;
import android.widget.Toast;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.StringRes;
import androidx.core.app.NotificationCompat;
import androidx.core.app.NotificationManagerCompat;
import androidx.core.app.ServiceCompat;
import org.reactivestreams.Publisher;
import org.schabi.newpipe.R;
import org.schabi.newpipe.error.ErrorInfo;
import org.schabi.newpipe.error.ErrorUtil;
import org.schabi.newpipe.error.UserAction;
import org.schabi.newpipe.extractor.subscription.SubscriptionExtractor;
import org.schabi.newpipe.ktx.ExceptionUtils;
import org.schabi.newpipe.local.subscription.SubscriptionManager;
import java.io.FileNotFoundException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.processors.PublishProcessor;
public abstract class BaseImportExportService extends Service {
protected final String TAG = this.getClass().getSimpleName();
protected final CompositeDisposable disposables = new CompositeDisposable();
protected final PublishProcessor<String> notificationUpdater = PublishProcessor.create();
protected NotificationManagerCompat notificationManager;
protected NotificationCompat.Builder notificationBuilder;
protected SubscriptionManager subscriptionManager;
private static final int NOTIFICATION_SAMPLING_PERIOD = 2500;
protected final AtomicInteger currentProgress = new AtomicInteger(-1);
protected final AtomicInteger maxProgress = new AtomicInteger(-1);
protected final ImportExportEventListener eventListener = new ImportExportEventListener() {
@Override
public void onSizeReceived(final int size) {
maxProgress.set(size);
currentProgress.set(0);
}
@Override
public void onItemCompleted(final String itemName) {
currentProgress.incrementAndGet();
notificationUpdater.onNext(itemName);
}
};
protected Toast toast;
@Nullable
@Override
public IBinder onBind(final Intent intent) {
return null;
}
@Override
public void onCreate() {
super.onCreate();
subscriptionManager = new SubscriptionManager(this);
setupNotification();
}
@Override
public void onDestroy() {
super.onDestroy();
disposeAll();
}
protected void disposeAll() {
disposables.clear();
}
/*//////////////////////////////////////////////////////////////////////////
// Notification Impl
//////////////////////////////////////////////////////////////////////////*/
protected abstract int getNotificationId();
@StringRes
public abstract int getTitle();
protected void setupNotification() {
notificationManager = NotificationManagerCompat.from(this);
notificationBuilder = createNotification();
startForeground(getNotificationId(), notificationBuilder.build());
final Function<Flowable<String>, Publisher<String>> throttleAfterFirstEmission = flow ->
flow.take(1).concatWith(flow.skip(1)
.throttleLast(NOTIFICATION_SAMPLING_PERIOD, TimeUnit.MILLISECONDS));
disposables.add(notificationUpdater
.filter(s -> !s.isEmpty())
.publish(throttleAfterFirstEmission)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::updateNotification));
}
protected void updateNotification(final String text) {
notificationBuilder
.setProgress(maxProgress.get(), currentProgress.get(), maxProgress.get() == -1);
final String progressText = currentProgress + "/" + maxProgress;
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
if (!TextUtils.isEmpty(text)) {
notificationBuilder.setContentText(text + " (" + progressText + ")");
}
} else {
notificationBuilder.setContentInfo(progressText);
notificationBuilder.setContentText(text);
}
notificationManager.notify(getNotificationId(), notificationBuilder.build());
}
protected void stopService() {
postErrorResult(null, null);
}
protected void stopAndReportError(final Throwable throwable, final String request) {
stopService();
ErrorUtil.createNotification(this, new ErrorInfo(
throwable, UserAction.SUBSCRIPTION_IMPORT_EXPORT, request));
}
protected void postErrorResult(final String title, final String text) {
disposeAll();
ServiceCompat.stopForeground(this, ServiceCompat.STOP_FOREGROUND_REMOVE);
stopSelf();
if (title == null) {
return;
}
final String textOrEmpty = text == null ? "" : text;
notificationBuilder = new NotificationCompat
.Builder(this, getString(R.string.notification_channel_id))
.setSmallIcon(R.drawable.ic_newpipe_triangle_white)
.setVisibility(NotificationCompat.VISIBILITY_PUBLIC)
.setContentTitle(title)
.setStyle(new NotificationCompat.BigTextStyle().bigText(textOrEmpty))
.setContentText(textOrEmpty);
notificationManager.notify(getNotificationId(), notificationBuilder.build());
}
protected NotificationCompat.Builder createNotification() {
return new NotificationCompat.Builder(this, getString(R.string.notification_channel_id))
.setOngoing(true)
.setProgress(-1, -1, true)
.setSmallIcon(R.drawable.ic_newpipe_triangle_white)
.setVisibility(NotificationCompat.VISIBILITY_PUBLIC)
.setContentTitle(getString(getTitle()));
}
/*//////////////////////////////////////////////////////////////////////////
// Toast
//////////////////////////////////////////////////////////////////////////*/
protected void showToast(@StringRes final int message) {
showToast(getString(message));
}
protected void showToast(final String message) {
if (toast != null) {
toast.cancel();
}
toast = Toast.makeText(this, message, Toast.LENGTH_SHORT);
toast.show();
}
/*//////////////////////////////////////////////////////////////////////////
// Error handling
//////////////////////////////////////////////////////////////////////////*/
protected void handleError(@StringRes final int errorTitle, @NonNull final Throwable error) {
String message = getErrorMessage(error);
if (TextUtils.isEmpty(message)) {
final String errorClassName = error.getClass().getName();
message = getString(R.string.error_occurred_detail, errorClassName);
}
showToast(errorTitle);
postErrorResult(getString(errorTitle), message);
}
protected String getErrorMessage(final Throwable error) {
String message = null;
if (error instanceof SubscriptionExtractor.InvalidSourceException) {
message = getString(R.string.invalid_source);
} else if (error instanceof FileNotFoundException) {
message = getString(R.string.invalid_file);
} else if (ExceptionUtils.isNetworkRelated(error)) {
message = getString(R.string.network_error);
}
return message;
}
}

View File

@ -1,17 +0,0 @@
package org.schabi.newpipe.local.subscription.services;
public interface ImportExportEventListener {
/**
* Called when the size has been resolved.
*
* @param size how many items there are to import/export
*/
void onSizeReceived(int size);
/**
* Called every time an item has been parsed/resolved.
*
* @param itemName the name of the subscription item
*/
void onItemCompleted(String itemName);
}

View File

@ -1,326 +0,0 @@
/*
* Copyright 2018 Mauricio Colli <mauriciocolli@outlook.com>
* SubscriptionsImportService.java is part of NewPipe
*
* License: GPL-3.0+
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.schabi.newpipe.local.subscription.services;
import static org.schabi.newpipe.MainActivity.DEBUG;
import static org.schabi.newpipe.streams.io.StoredFileHelper.DEFAULT_MIME;
import android.content.Intent;
import android.net.Uri;
import android.text.TextUtils;
import android.util.Log;
import android.util.Pair;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.core.content.IntentCompat;
import androidx.localbroadcastmanager.content.LocalBroadcastManager;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.schabi.newpipe.App;
import org.schabi.newpipe.R;
import org.schabi.newpipe.database.subscription.SubscriptionEntity;
import org.schabi.newpipe.extractor.NewPipe;
import org.schabi.newpipe.extractor.channel.ChannelInfo;
import org.schabi.newpipe.extractor.channel.tabs.ChannelTabInfo;
import org.schabi.newpipe.ktx.ExceptionUtils;
import org.schabi.newpipe.local.subscription.workers.SubscriptionItem;
import org.schabi.newpipe.streams.io.SharpInputStream;
import org.schabi.newpipe.streams.io.StoredFileHelper;
import org.schabi.newpipe.util.Constants;
import org.schabi.newpipe.util.ExtractorHelper;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Notification;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class SubscriptionsImportService extends BaseImportExportService {
public static final int CHANNEL_URL_MODE = 0;
public static final int INPUT_STREAM_MODE = 1;
public static final int PREVIOUS_EXPORT_MODE = 2;
public static final String KEY_MODE = "key_mode";
public static final String KEY_VALUE = "key_value";
/**
* A {@link LocalBroadcastManager local broadcast} will be made with this action
* when the import is successfully completed.
*/
public static final String IMPORT_COMPLETE_ACTION = App.PACKAGE_NAME + ".local.subscription"
+ ".services.SubscriptionsImportService.IMPORT_COMPLETE";
/**
* How many extractions running in parallel.
*/
public static final int PARALLEL_EXTRACTIONS = 8;
/**
* Number of items to buffer to mass-insert in the subscriptions table,
* this leads to a better performance as we can then use db transactions.
*/
public static final int BUFFER_COUNT_BEFORE_INSERT = 50;
private Subscription subscription;
private int currentMode;
private int currentServiceId;
@Nullable
private String channelUrl;
@Nullable
private InputStream inputStream;
@Nullable
private String inputStreamType;
@Override
public int onStartCommand(final Intent intent, final int flags, final int startId) {
if (intent == null || subscription != null) {
return START_NOT_STICKY;
}
currentMode = intent.getIntExtra(KEY_MODE, -1);
currentServiceId = intent.getIntExtra(Constants.KEY_SERVICE_ID, Constants.NO_SERVICE_ID);
if (currentMode == CHANNEL_URL_MODE) {
channelUrl = intent.getStringExtra(KEY_VALUE);
} else {
final Uri uri = IntentCompat.getParcelableExtra(intent, KEY_VALUE, Uri.class);
if (uri == null) {
stopAndReportError(new IllegalStateException(
"Importing from input stream, but file path is null"),
"Importing subscriptions");
return START_NOT_STICKY;
}
try {
final StoredFileHelper fileHelper = new StoredFileHelper(this, uri, DEFAULT_MIME);
inputStream = new SharpInputStream(fileHelper.getStream());
inputStreamType = fileHelper.getType();
if (inputStreamType == null || inputStreamType.equals(DEFAULT_MIME)) {
// mime type could not be determined, just take file extension
final String name = fileHelper.getName();
final int pointIndex = name.lastIndexOf('.');
if (pointIndex == -1 || pointIndex >= name.length() - 1) {
inputStreamType = DEFAULT_MIME; // no extension, will fail in the extractor
} else {
inputStreamType = name.substring(pointIndex + 1);
}
}
} catch (final IOException e) {
handleError(e);
return START_NOT_STICKY;
}
}
if (currentMode == -1 || currentMode == CHANNEL_URL_MODE && channelUrl == null) {
final String errorDescription = "Some important field is null or in illegal state: "
+ "currentMode=[" + currentMode + "], "
+ "channelUrl=[" + channelUrl + "], "
+ "inputStream=[" + inputStream + "]";
stopAndReportError(new IllegalStateException(errorDescription),
"Importing subscriptions");
return START_NOT_STICKY;
}
startImport();
return START_NOT_STICKY;
}
@Override
protected int getNotificationId() {
return 4568;
}
@Override
public int getTitle() {
return R.string.import_ongoing;
}
@Override
protected void disposeAll() {
super.disposeAll();
if (subscription != null) {
subscription.cancel();
}
}
/*//////////////////////////////////////////////////////////////////////////
// Imports
//////////////////////////////////////////////////////////////////////////*/
private void startImport() {
showToast(R.string.import_ongoing);
final var flowable = switch (currentMode) {
case CHANNEL_URL_MODE -> importFromChannelUrl();
case INPUT_STREAM_MODE -> importFromInputStream();
case PREVIOUS_EXPORT_MODE -> importFromPreviousExport();
default -> null;
};
if (flowable == null) {
final String message = "Flowable given by \"importFrom\" is null "
+ "(current mode: " + currentMode + ")";
stopAndReportError(new IllegalStateException(message), "Importing subscriptions");
return;
}
flowable.doOnNext(subscriptionItems ->
eventListener.onSizeReceived(subscriptionItems.size()))
.flatMap(Flowable::fromIterable)
.parallel(PARALLEL_EXTRACTIONS)
.runOn(Schedulers.io())
.map((Function<SubscriptionItem, Notification<Pair<ChannelInfo,
List<ChannelTabInfo>>>>) subscriptionItem -> {
try {
final ChannelInfo channelInfo = ExtractorHelper
.getChannelInfo(subscriptionItem.getServiceId(),
subscriptionItem.getUrl(), true)
.blockingGet();
return Notification.createOnNext(new Pair<>(channelInfo,
Collections.singletonList(
ExtractorHelper.getChannelTab(
subscriptionItem.getServiceId(),
channelInfo.getTabs().get(0), true).blockingGet()
)));
} catch (final Throwable e) {
return Notification.createOnError(e);
}
})
.sequential()
.observeOn(Schedulers.io())
.doOnNext(getNotificationsConsumer())
.buffer(BUFFER_COUNT_BEFORE_INSERT)
.map(upsertBatch())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getSubscriber());
}
private Subscriber<List<SubscriptionEntity>> getSubscriber() {
return new Subscriber<>() {
@Override
public void onSubscribe(final Subscription s) {
subscription = s;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(final List<SubscriptionEntity> successfulInserted) {
if (DEBUG) {
Log.d(TAG, "startImport() " + successfulInserted.size()
+ " items successfully inserted into the database");
}
}
@Override
public void onError(final Throwable error) {
Log.e(TAG, "Got an error!", error);
handleError(error);
}
@Override
public void onComplete() {
LocalBroadcastManager.getInstance(SubscriptionsImportService.this)
.sendBroadcast(new Intent(IMPORT_COMPLETE_ACTION));
showToast(R.string.import_complete_toast);
stopService();
}
};
}
private Consumer<Notification<Pair<ChannelInfo,
List<ChannelTabInfo>>>> getNotificationsConsumer() {
return notification -> {
if (notification.isOnNext()) {
final String name = notification.getValue().first.getName();
eventListener.onItemCompleted(!TextUtils.isEmpty(name) ? name : "");
} else if (notification.isOnError()) {
final Throwable error = notification.getError();
final Throwable cause = error.getCause();
if (error instanceof IOException) {
throw error;
} else if (cause instanceof IOException) {
throw cause;
} else if (ExceptionUtils.isNetworkRelated(error)) {
throw new IOException(error);
}
eventListener.onItemCompleted("");
}
};
}
private Function<List<Notification<Pair<ChannelInfo, List<ChannelTabInfo>>>>,
List<SubscriptionEntity>> upsertBatch() {
return notificationList -> {
final var infoList = notificationList.stream()
.filter(Notification::isOnNext)
.map(Notification::getValue)
.collect(Collectors.toList());
return subscriptionManager.upsertAll(infoList);
};
}
private Flowable<List<SubscriptionItem>> importFromChannelUrl() {
return Flowable.fromCallable(() -> NewPipe.getService(currentServiceId)
.getSubscriptionExtractor()
.fromChannelUrl(channelUrl))
.map(list -> list.stream()
.map(item -> new SubscriptionItem(item.getServiceId(), item.getUrl(),
item.getName()))
.collect(Collectors.toList()));
}
private Flowable<List<SubscriptionItem>> importFromInputStream() {
Objects.requireNonNull(inputStream);
Objects.requireNonNull(inputStreamType);
return Flowable.fromCallable(() -> NewPipe.getService(currentServiceId)
.getSubscriptionExtractor()
.fromInputStream(inputStream, inputStreamType))
.map(list -> list.stream()
.map(item -> new SubscriptionItem(item.getServiceId(), item.getUrl(),
item.getName()))
.collect(Collectors.toList()));
}
private Flowable<List<SubscriptionItem>> importFromPreviousExport() {
return Flowable.fromCallable(() -> ImportExportJsonHelper.readFrom(inputStream));
}
protected void handleError(@NonNull final Throwable error) {
super.handleError(R.string.subscriptions_import_unsuccessful, error);
}
}

View File

@ -17,15 +17,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.schabi.newpipe.local.subscription.services
package org.schabi.newpipe.local.subscription.workers
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromStream
import kotlinx.serialization.json.encodeToStream
import org.schabi.newpipe.extractor.subscription.SubscriptionExtractor.InvalidSourceException
import org.schabi.newpipe.local.subscription.workers.SubscriptionData
import org.schabi.newpipe.local.subscription.workers.SubscriptionItem
import java.io.InputStream
import java.io.OutputStream

View File

@ -23,7 +23,6 @@ import kotlinx.coroutines.withContext
import org.schabi.newpipe.BuildConfig
import org.schabi.newpipe.NewPipeDatabase
import org.schabi.newpipe.R
import org.schabi.newpipe.local.subscription.services.ImportExportJsonHelper
class SubscriptionExportWorker(
appContext: Context,

View File

@ -0,0 +1,153 @@
package org.schabi.newpipe.local.subscription.workers
import android.app.Notification
import android.content.Context
import android.content.pm.ServiceInfo
import android.os.Build
import android.util.Pair
import android.webkit.MimeTypeMap
import androidx.core.app.NotificationCompat
import androidx.core.net.toUri
import androidx.work.CoroutineWorker
import androidx.work.ForegroundInfo
import androidx.work.WorkManager
import androidx.work.WorkerParameters
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.rx3.await
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import org.schabi.newpipe.R
import org.schabi.newpipe.extractor.NewPipe
import org.schabi.newpipe.local.subscription.SubscriptionManager
import org.schabi.newpipe.util.ExtractorHelper
import org.schabi.newpipe.util.KEY_SERVICE_ID
import org.schabi.newpipe.util.NO_SERVICE_ID
class SubscriptionImportWorker(
appContext: Context,
params: WorkerParameters,
) : CoroutineWorker(appContext, params) {
// This is needed for API levels < 31 (Android S).
override suspend fun getForegroundInfo(): ForegroundInfo {
val title = applicationContext.getString(R.string.import_ongoing)
return createForegroundInfo(createNotification(title, null, 0, 0))
}
override suspend fun doWork(): Result {
val mode = inputData.getInt(KEY_MODE, CHANNEL_URL_MODE)
val extractor = NewPipe.getService(inputData.getInt(KEY_SERVICE_ID, NO_SERVICE_ID))
.subscriptionExtractor
val value = inputData.getString(KEY_VALUE) ?: ""
val subscriptions = withContext(Dispatchers.IO) {
if (mode == CHANNEL_URL_MODE) {
extractor
.fromChannelUrl(value)
.map { SubscriptionItem(it.serviceId, it.url, it.name) }
} else {
applicationContext.contentResolver.openInputStream(value.toUri())?.use {
if (mode == INPUT_STREAM_MODE) {
val contentType = MimeTypeMap.getFileExtensionFromUrl(value).ifEmpty { DEFAULT_MIME }
extractor
.fromInputStream(it, contentType)
.map { SubscriptionItem(it.serviceId, it.url, it.name) }
} else {
ImportExportJsonHelper.readFrom(it)
}
} ?: emptyList()
}
}
val mutex = Mutex()
var index = 1
val qty = subscriptions.size
var title =
applicationContext.resources.getQuantityString(R.plurals.load_subscriptions, qty, qty)
val channelInfoList = withContext(Dispatchers.IO.limitedParallelism(PARALLEL_EXTRACTIONS)) {
subscriptions
.map {
async {
val channelInfo =
ExtractorHelper.getChannelInfo(it.serviceId, it.url, true).await()
val channelTab =
ExtractorHelper.getChannelTab(it.serviceId, channelInfo.tabs[0], true).await()
val currentIndex = mutex.withLock { index++ }
val notification = createNotification(title, channelInfo.name, currentIndex, qty)
setForeground(createForegroundInfo(notification))
Pair(channelInfo, listOf(channelTab))
}
}.awaitAll()
}
title = applicationContext.resources.getQuantityString(R.plurals.import_subscriptions, qty, qty)
setForeground(createForegroundInfo(createNotification(title, null, 0, 0)))
index = 0
val subscriptionManager = SubscriptionManager(applicationContext)
for (chunk in channelInfoList.chunked(BUFFER_COUNT_BEFORE_INSERT)) {
withContext(Dispatchers.IO) {
subscriptionManager.upsertAll(chunk)
}
index += chunk.size
setForeground(createForegroundInfo(createNotification(title, null, index, qty)))
}
return Result.success()
}
private fun createNotification(
title: String,
text: String?,
currentProgress: Int,
maxProgress: Int,
): Notification =
NotificationCompat
.Builder(applicationContext, NOTIFICATION_CHANNEL_ID)
.setSmallIcon(R.drawable.ic_newpipe_triangle_white)
.setOngoing(true)
.setProgress(maxProgress, currentProgress, currentProgress == 0)
.setVisibility(NotificationCompat.VISIBILITY_PUBLIC)
.setForegroundServiceBehavior(NotificationCompat.FOREGROUND_SERVICE_IMMEDIATE)
.setContentTitle(title)
.setContentText(text)
.addAction(
R.drawable.ic_close,
applicationContext.getString(R.string.cancel),
WorkManager.getInstance(applicationContext).createCancelPendingIntent(id),
).apply {
if (currentProgress > 0 && maxProgress > 0) {
val progressText = "$currentProgress/$maxProgress"
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
setSubText(progressText)
} else {
setContentInfo(progressText)
}
}
}.build()
private fun createForegroundInfo(notification: Notification): ForegroundInfo {
val serviceType = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) ServiceInfo.FOREGROUND_SERVICE_TYPE_DATA_SYNC else 0
return ForegroundInfo(NOTIFICATION_ID, notification, serviceType)
}
companion object {
private const val NOTIFICATION_ID = 4568
private const val NOTIFICATION_CHANNEL_ID = "newpipe"
private const val DEFAULT_MIME = "application/octet-stream"
private const val PARALLEL_EXTRACTIONS = 8
private const val BUFFER_COUNT_BEFORE_INSERT = 50
const val WORK_NAME = "SubscriptionImportWorker"
const val CHANNEL_URL_MODE = 0
const val INPUT_STREAM_MODE = 1
const val PREVIOUS_EXPORT_MODE = 2
const val KEY_MODE = "key_mode"
const val KEY_VALUE = "key_value"
}
}

View File

@ -866,4 +866,12 @@
<item quantity="one">Exporting %d subscription…</item>
<item quantity="other">Exporting %d subscriptions…</item>
</plurals>
<plurals name="load_subscriptions">
<item quantity="one">Loading %d subscription…</item>
<item quantity="other">Loading %d subscriptions…</item>
</plurals>
<plurals name="import_subscriptions">
<item quantity="one">Importing %d subscription…</item>
<item quantity="other">Importing %d subscriptions…</item>
</plurals>
</resources>

View File

@ -5,6 +5,7 @@ import static org.junit.Assert.fail;
import org.junit.Test;
import org.schabi.newpipe.extractor.subscription.SubscriptionExtractor;
import org.schabi.newpipe.local.subscription.workers.ImportExportJsonHelper;
import org.schabi.newpipe.local.subscription.workers.SubscriptionItem;
import java.io.ByteArrayInputStream;