* feat (Connection Statistics): Restored the server connection statistics feature * perf(store): Optimize data storage performance and implement caching mechanisms - Implement caching mechanisms in SnippetStore and ServerStore to reduce redundant loading - Refactor ConnectionStatsStore to use indexes and optimize query performance - Adopt a more efficient approach when cleaning up expired records - Add a maximum record limit to prevent data bloat * perf(store): Optimize data storage performance and add a caching mechanism Add a caching mechanism to PrivateKeyStore to reduce redundant loading Make the cleanup and index rebuilding of ConnectionStatsStore asynchronous Add database compression and size statistics Display database size in the interface and optimize compression operations * fix (Cache): Fixed cache invalidation and join statistics issues - Added a cache invalidation call to the reload method - Fixed an error in the calculation of join statistics timestamps - Optimized the cache index rebuild logic - Added tooltips and click effects for join statistics * refactor(connection_stats): Convert file operations from synchronous to asynchronous and optimize record cleanup logic Convert the database size retrieval method from synchronous to asynchronous to prevent UI blocking Optimize server record cleanup logic by directly deleting redundant records instead of rebuilding indexes * fix(connection_stats): Fixed an initialization issue when the index database is empty During Stores initialization, the code now checks whether `connectionStats.indexDbKeys` is empty; if so, it calls `rebuildIndexAndCompact` to rebuild and compact the database. Additionally, the implementation of the `_pruneExcessRecords` method has been optimized to use tuples instead of temporary lists, thereby improving performance. A `mounted` check has been added at the UI layer to prevent state update issues during asynchronous operations. * fix(server): Improved error string matching logic to more accurately identify connection issues Error strings are now uniformly converted to lowercase for comparison, and matching criteria have been expanded to cover a wider range of error scenarios, including timeouts, authentication failures, and network errors * fix(PrivateKeyStore): Fixed an issue where the cache state was not updated when clearing the cache When clearing the private key store, ensure that the internal cache state is updated simultaneously to maintain consistency * refactor(store): Add close methods and clean up subscription logic Add close methods to PrivateKeyStore, SnippetStore, and ServerStore to unsubscribe Unify cache cleanup logic to prevent memory leaks * fix(store): Add a cache update suppression mechanism to prevent circular updates Add an _suppressWatch flag to multiple Store classes to suppress cache invalidation during internal operations Add a _putWithoutInvalidatingCache method to prevent recursive watchers from being triggered during data updates * refactor(store): Improve caching and state management using try-finally In PrivateKeyStore, ServerStore, and SnippetStore: 1. Remove redundant close methods 2. Use try-finally to ensure the _suppressWatch state is reset correctly 3. Optimize cache invalidation logic 4. Standardize transaction handling for update operations * refactor(store): Optimize data storage operations and fix potential issues - Ensure the safety and consistency of list operations in ConnectionStatsStore - Replace direct calls to `box.put` with the `set` method in SnippetStore and ServerStore - Extract decoding logic for PrivateKeyStore into a separate method - Add logic to update server-hopping relationships * fix: Fixed an issue where asynchronous operations were not being waited on and optimized storage operations Fixed several issues where asynchronous operations were not being waited on to ensure data consistency Added the _suppressWatch control to ServerStore and PrivateKeyStore Optimized index management in ConnectionStatsStore to maintain record order Added a new GitHub participant * fix: Fixed potential state issues and memory leaks in asynchronous operations Fixed potential state issues that could occur on the server edit page after a delete operation; added a mounted check Changed the statistics clearing operation in connection_stats to run asynchronously Optimized asynchronous operations in PrivateKeyStore and fixed potential memory leaks * refactor(store): Convert asynchronous methods to synchronous ones to simplify the code Fixed an issue where asynchronous operations were not handled correctly on the connection statistics page * fix: Added mounted check and error handling for connection logs Added a mounted check in _ConnectionStatsPageState to prevent the state from being updated after the component is unmounted Added a try-catch block for connection logs in ServerNotifier to catch and log potential storage exceptions
394 lines
14 KiB
Dart
394 lines
14 KiB
Dart
import 'dart:async';
|
|
|
|
import 'package:computer/computer.dart';
|
|
import 'package:dartssh2/dartssh2.dart';
|
|
import 'package:fl_lib/fl_lib.dart';
|
|
import 'package:freezed_annotation/freezed_annotation.dart';
|
|
import 'package:riverpod_annotation/riverpod_annotation.dart';
|
|
import 'package:server_box/core/extension/ssh_client.dart';
|
|
import 'package:server_box/core/utils/server.dart';
|
|
import 'package:server_box/core/utils/ssh_auth.dart';
|
|
import 'package:server_box/data/helper/ssh_decoder.dart';
|
|
import 'package:server_box/data/helper/system_detector.dart';
|
|
import 'package:server_box/data/model/app/error.dart';
|
|
import 'package:server_box/data/model/app/scripts/script_consts.dart';
|
|
import 'package:server_box/data/model/app/scripts/shell_func.dart';
|
|
import 'package:server_box/data/model/server/connection_stat.dart';
|
|
import 'package:server_box/data/model/server/server.dart';
|
|
import 'package:server_box/data/model/server/server_private_info.dart';
|
|
import 'package:server_box/data/model/server/server_status_update_req.dart';
|
|
import 'package:server_box/data/model/server/system.dart';
|
|
import 'package:server_box/data/model/server/try_limiter.dart';
|
|
import 'package:server_box/data/provider/server/all.dart';
|
|
import 'package:server_box/data/res/status.dart';
|
|
import 'package:server_box/data/res/store.dart';
|
|
import 'package:server_box/data/ssh/session_manager.dart';
|
|
|
|
part 'single.g.dart';
|
|
part 'single.freezed.dart';
|
|
|
|
// Individual server state, including connection and status information
|
|
@freezed
|
|
abstract class ServerState with _$ServerState {
|
|
const factory ServerState({
|
|
required Spi spi,
|
|
required ServerStatus status,
|
|
@Default(ServerConn.disconnected) ServerConn conn,
|
|
SSHClient? client,
|
|
}) = _ServerState;
|
|
}
|
|
|
|
// Individual server state management
|
|
@Riverpod(keepAlive: true)
|
|
class ServerNotifier extends _$ServerNotifier {
|
|
@override
|
|
ServerState build(String serverId) {
|
|
final serverNotifier = ref.read(serversProvider);
|
|
final spi = serverNotifier.servers[serverId];
|
|
if (spi == null) {
|
|
throw StateError('Server $serverId not found');
|
|
}
|
|
|
|
return ServerState(spi: spi, status: InitStatus.status);
|
|
}
|
|
|
|
// Update connection status
|
|
void updateConnection(ServerConn conn) {
|
|
state = state.copyWith(conn: conn);
|
|
}
|
|
|
|
// Update server status
|
|
void updateStatus(ServerStatus status) {
|
|
state = state.copyWith(status: status);
|
|
}
|
|
|
|
// Update SSH client
|
|
void updateClient(SSHClient? client) {
|
|
state = state.copyWith(client: client);
|
|
}
|
|
|
|
// Update SPI configuration
|
|
void updateSpi(Spi spi) {
|
|
state = state.copyWith(spi: spi);
|
|
}
|
|
|
|
// Close connection
|
|
void closeConnection() {
|
|
final client = state.client;
|
|
client?.close();
|
|
state = state.copyWith(client: null, conn: ServerConn.disconnected);
|
|
}
|
|
|
|
// Refresh server status
|
|
bool _isRefreshing = false;
|
|
|
|
Future<void> refresh() async {
|
|
if (_isRefreshing) return;
|
|
|
|
_isRefreshing = true;
|
|
try {
|
|
await _updateServer();
|
|
} finally {
|
|
_isRefreshing = false;
|
|
}
|
|
}
|
|
|
|
Future<void> _updateServer() async {
|
|
await _getData();
|
|
}
|
|
|
|
Future<void> _getData() async {
|
|
final spi = state.spi;
|
|
final sid = spi.id;
|
|
|
|
if (!TryLimiter.canTry(sid)) {
|
|
if (state.conn != ServerConn.failed) {
|
|
updateConnection(ServerConn.failed);
|
|
}
|
|
return;
|
|
}
|
|
|
|
final newStatus = state.status..err = null; // Clear previous error
|
|
updateStatus(newStatus);
|
|
|
|
if (state.conn < ServerConn.connecting || (state.client?.isClosed ?? true)) {
|
|
updateConnection(ServerConn.connecting);
|
|
|
|
// Wake on LAN
|
|
final wol = spi.wolCfg;
|
|
if (wol != null) {
|
|
try {
|
|
await wol.wake();
|
|
} catch (e) {
|
|
Loggers.app.warning('Wake on lan failed', e);
|
|
}
|
|
}
|
|
|
|
final time1 = DateTime.now();
|
|
try {
|
|
final client = await genClient(
|
|
spi,
|
|
timeout: Duration(seconds: Stores.setting.timeout.fetch()),
|
|
onKeyboardInteractive: (_) => KeybordInteractive.defaultHandle(spi),
|
|
);
|
|
updateClient(client);
|
|
|
|
final time2 = DateTime.now();
|
|
final spentTime = time2.difference(time1).inMilliseconds;
|
|
if (spi.jumpId == null) {
|
|
Loggers.app.info('Connected to ${spi.name} in $spentTime ms.');
|
|
} else {
|
|
Loggers.app.info('Jump to ${spi.name} in $spentTime ms.');
|
|
}
|
|
|
|
try {
|
|
await Stores.connectionStats.recordConnection(ConnectionStat(
|
|
serverId: spi.id,
|
|
serverName: spi.name,
|
|
timestamp: time1,
|
|
result: ConnectionResult.success,
|
|
durationMs: spentTime,
|
|
));
|
|
} catch (e) {
|
|
Loggers.app.warning('Failed to record connection success', e);
|
|
}
|
|
|
|
final sessionId = 'ssh_${spi.id}';
|
|
TermSessionManager.add(
|
|
id: sessionId,
|
|
spi: spi,
|
|
startTimeMs: time1.millisecondsSinceEpoch,
|
|
disconnect: () => ref.read(serversProvider.notifier).closeOneServer(spi.id),
|
|
status: TermSessionStatus.connecting,
|
|
);
|
|
TermSessionManager.setActive(sessionId, hasTerminal: false);
|
|
} catch (e) {
|
|
TryLimiter.inc(sid);
|
|
|
|
final durationMs = DateTime.now().difference(time1).inMilliseconds;
|
|
|
|
ConnectionResult failureResult;
|
|
final errStr = e.toString().toLowerCase();
|
|
if (errStr.contains('timed out') || errStr.contains('timeout')) {
|
|
failureResult = ConnectionResult.timeout;
|
|
} else if (errStr.contains('auth') || errStr.contains('authentication') || errStr.contains('permission denied') || errStr.contains('access denied')) {
|
|
failureResult = ConnectionResult.authFailed;
|
|
} else if (errStr.contains('connection refused') || errStr.contains('no route to host') || errStr.contains('network') || errStr.contains('socket')) {
|
|
failureResult = ConnectionResult.networkError;
|
|
} else {
|
|
failureResult = ConnectionResult.unknownError;
|
|
}
|
|
|
|
try {
|
|
await Stores.connectionStats.recordConnection(ConnectionStat(
|
|
serverId: spi.id,
|
|
serverName: spi.name,
|
|
timestamp: time1,
|
|
result: failureResult,
|
|
errorMessage: e.toString(),
|
|
durationMs: durationMs,
|
|
));
|
|
} catch (recErr) {
|
|
Loggers.app.warning('Failed to record connection failure', recErr);
|
|
}
|
|
|
|
final newStatus = state.status..err = SSHErr(type: SSHErrType.connect, message: e.toString());
|
|
updateStatus(newStatus);
|
|
updateConnection(ServerConn.failed);
|
|
|
|
// Remove SSH session when connection fails
|
|
final sessionId = 'ssh_${spi.id}';
|
|
TermSessionManager.remove(sessionId);
|
|
|
|
Loggers.app.warning('Connect to ${spi.name} failed', e);
|
|
return;
|
|
}
|
|
|
|
updateConnection(ServerConn.connected);
|
|
|
|
// Update SSH session status to connected
|
|
final sessionId = 'ssh_${spi.id}';
|
|
TermSessionManager.updateStatus(sessionId, TermSessionStatus.connected);
|
|
|
|
try {
|
|
// Detect system type
|
|
final detectedSystemType = await SystemDetector.detect(state.client!, spi);
|
|
final newStatus = state.status..system = detectedSystemType;
|
|
updateStatus(newStatus);
|
|
|
|
Loggers.app.info('Writing script for ${spi.name} (${detectedSystemType.name})');
|
|
|
|
final (stdoutResult, writeScriptResult) = await state.client!.execSafe(
|
|
(session) async {
|
|
final scriptRaw = ShellFuncManager.allScript(
|
|
spi.custom?.cmds,
|
|
systemType: detectedSystemType,
|
|
disabledCmdTypes: spi.disabledCmdTypes,
|
|
).uint8List;
|
|
session.stdin.add(scriptRaw);
|
|
session.stdin.close();
|
|
},
|
|
entry: ShellFuncManager.getInstallShellCmd(
|
|
spi.id,
|
|
systemType: detectedSystemType,
|
|
customDir: spi.custom?.scriptDir,
|
|
),
|
|
systemType: detectedSystemType,
|
|
context: 'WriteScript<${spi.name}>',
|
|
);
|
|
|
|
if (stdoutResult.isNotEmpty) {
|
|
Loggers.app.info('Script write stdout for ${spi.name}: $stdoutResult');
|
|
}
|
|
|
|
if (writeScriptResult.isNotEmpty) {
|
|
Loggers.app.warning('Script write stderr for ${spi.name}: $writeScriptResult');
|
|
if (detectedSystemType != SystemType.windows) {
|
|
ShellFuncManager.switchScriptDir(spi.id, systemType: detectedSystemType);
|
|
throw writeScriptResult;
|
|
}
|
|
} else {
|
|
Loggers.app.info('Script written successfully for ${spi.name}');
|
|
}
|
|
} on SSHAuthAbortError catch (e) {
|
|
TryLimiter.inc(sid);
|
|
final err = SSHErr(type: SSHErrType.auth, message: e.toString());
|
|
final newStatus = state.status..err = err;
|
|
updateStatus(newStatus);
|
|
Loggers.app.warning(err);
|
|
updateConnection(ServerConn.failed);
|
|
|
|
final sessionId = 'ssh_${spi.id}';
|
|
TermSessionManager.updateStatus(sessionId, TermSessionStatus.disconnected);
|
|
return;
|
|
} on SSHAuthFailError catch (e) {
|
|
TryLimiter.inc(sid);
|
|
final err = SSHErr(type: SSHErrType.auth, message: e.toString());
|
|
final newStatus = state.status..err = err;
|
|
updateStatus(newStatus);
|
|
Loggers.app.warning(err);
|
|
updateConnection(ServerConn.failed);
|
|
|
|
final sessionId = 'ssh_${spi.id}';
|
|
TermSessionManager.updateStatus(sessionId, TermSessionStatus.disconnected);
|
|
return;
|
|
} catch (e) {
|
|
final err = SSHErr(type: SSHErrType.writeScript, message: e.toString());
|
|
final newStatus = state.status..err = err;
|
|
updateStatus(newStatus);
|
|
Loggers.app.warning(err);
|
|
updateConnection(ServerConn.failed);
|
|
|
|
final sessionId = 'ssh_${spi.id}';
|
|
TermSessionManager.updateStatus(sessionId, TermSessionStatus.disconnected);
|
|
}
|
|
}
|
|
|
|
if (state.conn == ServerConn.connecting) return;
|
|
|
|
// Keep finished status to prevent UI from refreshing to loading state
|
|
if (state.conn != ServerConn.finished) {
|
|
updateConnection(ServerConn.loading);
|
|
}
|
|
|
|
List<String>? segments;
|
|
String? raw;
|
|
|
|
try {
|
|
final statusCmd = ShellFunc.status.exec(spi.id, systemType: state.status.system, customDir: spi.custom?.scriptDir);
|
|
// Loggers.app.info('Running status command for ${spi.name} (${state.status.system.name}): $statusCmd');
|
|
final execResult = await state.client?.run(statusCmd);
|
|
if (execResult != null) {
|
|
raw = SSHDecoder.decode(
|
|
execResult,
|
|
isWindows: state.status.system == SystemType.windows,
|
|
context: 'GetStatus<${spi.name}>',
|
|
);
|
|
// Loggers.app.info('Status response length for ${spi.name}: ${raw.length} bytes');
|
|
} else {
|
|
raw = '';
|
|
Loggers.app.warning('No status result from ${spi.name}');
|
|
}
|
|
|
|
if (raw.isEmpty) {
|
|
TryLimiter.inc(sid);
|
|
final newStatus = state.status
|
|
..err = SSHErr(type: SSHErrType.segements, message: 'Empty response from server');
|
|
updateStatus(newStatus);
|
|
updateConnection(ServerConn.failed);
|
|
|
|
final sessionId = 'ssh_${spi.id}';
|
|
TermSessionManager.updateStatus(sessionId, TermSessionStatus.disconnected);
|
|
return;
|
|
}
|
|
|
|
segments = raw.split(ScriptConstants.separator).map((e) => e.trim()).toList();
|
|
if (segments.isEmpty) {
|
|
if (Stores.setting.keepStatusWhenErr.fetch()) {
|
|
// Keep previous server status when error occurs
|
|
if (state.conn != ServerConn.failed && state.status.more.isNotEmpty) {
|
|
return;
|
|
}
|
|
}
|
|
TryLimiter.inc(sid);
|
|
final newStatus = state.status
|
|
..err = SSHErr(type: SSHErrType.segements, message: 'Separate segments failed, raw:\n$raw');
|
|
updateStatus(newStatus);
|
|
updateConnection(ServerConn.failed);
|
|
|
|
final sessionId = 'ssh_${spi.id}';
|
|
TermSessionManager.updateStatus(sessionId, TermSessionStatus.disconnected);
|
|
return;
|
|
}
|
|
} catch (e) {
|
|
TryLimiter.inc(sid);
|
|
final newStatus = state.status..err = SSHErr(type: SSHErrType.getStatus, message: e.toString());
|
|
updateStatus(newStatus);
|
|
updateConnection(ServerConn.failed);
|
|
Loggers.app.warning('Get status from ${spi.name} failed', e);
|
|
|
|
final sessionId = 'ssh_${spi.id}';
|
|
TermSessionManager.updateStatus(sessionId, TermSessionStatus.disconnected);
|
|
return;
|
|
}
|
|
|
|
try {
|
|
// Parse script output into command-specific mappings
|
|
final parsedOutput = ScriptConstants.parseScriptOutput(raw);
|
|
|
|
final req = ServerStatusUpdateReq(
|
|
ss: state.status,
|
|
parsedOutput: parsedOutput,
|
|
system: state.status.system,
|
|
customCmds: spi.custom?.cmds ?? {},
|
|
);
|
|
final newStatus = await Computer.shared.start(getStatus, req, taskName: 'StatusUpdateReq<${spi.id}>');
|
|
updateStatus(newStatus);
|
|
} catch (e, trace) {
|
|
TryLimiter.inc(sid);
|
|
final newStatus = state.status
|
|
..err = SSHErr(type: SSHErrType.getStatus, message: 'Parse failed: $e\n\n$raw');
|
|
updateStatus(newStatus);
|
|
updateConnection(ServerConn.failed);
|
|
Loggers.app.warning('Server status', e, trace);
|
|
|
|
final sessionId = 'ssh_${spi.id}';
|
|
TermSessionManager.updateStatus(sessionId, TermSessionStatus.disconnected);
|
|
return;
|
|
}
|
|
|
|
// Set Server.isBusy to false each time this method is called
|
|
updateConnection(ServerConn.finished);
|
|
// Reset retry count only after successful preparation
|
|
TryLimiter.reset(sid);
|
|
}
|
|
}
|
|
|
|
extension IndividualServerStateExtension on ServerState {
|
|
bool get needGenClient => conn < ServerConn.connecting;
|
|
|
|
bool get canViewDetails => conn == ServerConn.finished;
|
|
|
|
String get id => spi.id;
|
|
}
|