Concepts & architecture¶
Fujin Shuttle is the Shopware-side port of the Unirgy Rapidflow import/export engine. If you know Rapidflow, the vocabulary carries over directly; if you don't, this page is the map.
The engine runs embedded in-process, but the ServiceRequest/Backend channel seam from
Rapidflow is preserved so a remote-cluster variant can be layered on later without rewriting
the deputies.
The Rapidflow vocabulary¶
| Rapidflow term | Fujin Shuttle class(es) | Role |
|---|---|---|
| Agent | src/DataType/Agent/* |
The flyweight selected for one (dataType, transferMode) pair. Owns the run. |
| Deputy | src/Deputy/* |
The concrete worker for a data type — orchestrates Pass A / Pass B. |
| Processor / pipeline | src/Pipeline/*, amp-data-pipeline |
Streams and transforms data items (CSV row → batch). |
| Batch emitter | BatchEmitter (in fujin-shuttle-message) |
Accumulates rows into batches of N. |
| Channel / Backend | src/Channel/* |
The request/response seam between deputy and the database layer. |
| Request handler | src/Channel/RequestHandler/* (47) |
Executes one kind of ServiceRequest. |
| Depot | src/DataDepot/* |
The raw bulk-SQL reader/writer for an entity type. |
| ID resolver | src/IdResolver/Shopware/* |
Resolves codes/names/paths → Shopware UUIDs (and can auto-create). |
The big picture¶
┌─────────────────────────────────────────────┐
CLI / Admin ──► │ ProfileFacade (src/Model) │
│ builds SystemConfig + Profile + Context │
└───────────────┬─────────────────────────────┘
▼
┌─────────────────────────────────────────────┐
│ AgentContainer → DefaultSelector │
│ picks + clones the Agent for (type, mode) │
└───────────────┬─────────────────────────────┘
▼
┌─────────────────────────────────────────────┐
│ Deputy (src/Deputy) │
│ Pass A: resolve all referenced codes │
│ Pass B: stream rows → batches │
└───────────────┬─────────────────────────────┘
▼ ServiceRequest
┌─────────────────────────────────────────────┐
│ Backend → HandlerRegistry → RequestHandler │
│ (src/Channel) │
└───────────────┬─────────────────────────────┘
▼
┌─────────────────────────────────────────────┐
│ Depot (src/DataDepot) │
│ InsertOnDuplicate → bulk SQL on the │
│ async / sync connection pool │
└─────────────────────────────────────────────┘
Agents and deputies¶
An agent is a thin flyweight registered against a (dataType, transferMode) pair (e.g.
product + import). AgentContainer holds the registry; DefaultSelector matches and
clones the agent for each run so per-run state is isolated. The matching agents are wired in
Resources/config/services/agent.xml and the per-entity service files.
The agent delegates the real work to a deputy — the class that knows how a particular data
type is shaped (src/Deputy/ProductImport.php, CategoryImport.php, InventoryImport.php, …).
The deputy is where Pass A / Pass B live.
The two-pass import model¶
Most imports run in two passes. Take products as the worked example:
Pass A — resolve references (sequential)¶
The deputy scans the CSV once and collects every distinct referenced code:
- category paths (
Home > Furniture > Chairs) - sales-channel, manufacturer, delivery-time, unit and tax keys
- property
Group:Optionpairs and custom-field names - media sources (URLs / paths)
- rule names
Each set is resolved once through an ID resolver
to a code → idBytes map, auto-creating missing targets where the profile allows it
(categories, property options, custom fields). The maps are held on the deputy as run-wide
assets, so Pass B never does a lookup it can avoid.
Why a second physical pass is fine
Re-reading the file is cheap relative to the write work, and the CLI is the large-file path. Resolving up front lets Pass B run with foreign-key checks off (all targets already exist) and write in big concurrent batches.
Pass B — write rows (concurrent)¶
The CSV is streamed again. A reader fiber pushes rows into a bounded queue; a
BatchEmitter accumulates them into batches (flushing at
batch_size or when the row type changes). Each batch is processed by a worker that:
- looks up IDs from the Pass A asset maps,
- builds the entity / translation / category / visibility / price / property / media rows,
- sends one
ServiceRequestper entity table through the channel, - ticks the run monitor for progress and checks for a stop request.
Batches run concurrently up to the configured concurrency limit.
Ordering hazards stay sequential
Concurrency is only safe for independent batches (products, inventory). Data with intra-file ordering dependencies — categories (parent → child), entity metadata (group → option), custom-field JSON merges — is processed sequentially regardless of the concurrency setting.
The channel seam¶
Deputies never touch the database directly. They send typed ServiceRequest objects to a
Backend; the local implementation (LocalBackend) looks the request up in a
HandlerRegistry (a compile-time map of request class → handler service) and dispatches
to the matching RequestHandler, returning a typed Response. Exceptions are caught and
returned as an error response, so the seam is wire-compatible with a future remote transport.
Handlers are registered with the fujin_shuttle.request_handler DI tag, keyed by the request
class they handle — a profile can add handlers without editing shared files.
The 47 request handlers, grouped¶
src/Channel/RequestHandler/ holds ~47 handlers. By naming pattern:
| Group | Pattern | What they do |
|---|---|---|
| Resolve | Resolve…IdsHandler, ResolveCategoryPathsHandler, ResolveCustomFieldsHandler |
Pass A: look up / auto-create FK targets, return code → id maps |
| Load | Load…Handler |
Export reads (products, variants, categories, inventory, rules, metadata) |
| Upsert | …UpsertHandler |
Pass B: INSERT … ON DUPLICATE KEY UPDATE into entity + translation tables |
| Link | …LinkHandler |
Insert into composite-key link tables (product_category, product_media, options) |
| Delete | Delete…Handler |
Clear stale associations / rule conditions before re-import |
| Special | ProductCustomFieldMergeHandler, StockUpdateHandler, RuleConditionInsertHandler |
Read-modify-write merges and other one-offs |
Depots and raw bulk SQL¶
A depot (src/DataDepot/<Entity>/ImportDepot.php / ExportDepot.php) is the only place
that talks SQL. On the write side it builds an INSERT … ON DUPLICATE KEY UPDATE with the bulk
writer from flyokai/laminas-db-bulk-update, filtering columns the live schema doesn't have
(forward/backward version compatibility) and excluding immutable columns (id, version_id,
created_at, …) from the ON DUPLICATE clause. Link tables use INSERT IGNORE.
On the read (export) side a depot pages through rows with keyset pagination and assembles the wide-column shape (scalars + prices + translations + associations) the export agent emits.
Async, in one paragraph¶
Pass B runs on the Revolt event loop with a pooled, non-blocking ext-mysqli driver. Each
batch is a fiber; when its INSERT awaits the network the loop runs another batch's fiber, so
up to concurrency statements are in flight on a single PHP thread. Foreign-key checks are set
off once per pooled link (Pass A guarantees the targets exist), and the bulk writer skips
the per-statement FK toggle on async connections — both are documented wins in
Performance & tuning. The sync/PDO driver (dbDriver = sync) is available as
a fallback and is byte-identical on output.
Sync vs async driver
dbDriver (admin config) picks the connection pool. Async (default) is faster on a
networked database, where round-trip latency is hidden across concurrent batches. On a
localhost database the insert path is MySQL-bound and concurrency barely matters — see the
flat sweep in Performance.
Tracking a run¶
CLI ad-hoc runs (fujin:shuttle:run) are untracked. Tracked runs — the admin Run
button, queued RunProfileMessage, and fujin:shuttle:run-profile — flow through a
run monitor that:
- writes throttled progress (
processed,progress,phase) and aheartbeat_atliveness stamp to thefujin_jobrow, - reads the job
status, so flipping it tostopping(the admin Stop button) makes the next batch boundary break the loop cooperatively — rows already written are kept, - lets a duplicate-run guard refuse a second concurrent run of the same profile (a stale heartbeat reaps an abandoned job instead of blocking forever).
Job lifecycle and the exact columns are in Database schema.
Source-tree map¶
| Path | Contents |
|---|---|
src/Command/ |
The four CLI commands |
src/Administration/Controller/ |
Admin REST controller |
src/Model/ |
ProfileFacade, ProfileStore, run monitor |
src/DataType/ + Agent/ |
Agent registry, selector, agents |
src/Deputy/ |
Per-type deputies (Pass A / Pass B logic) |
src/Channel/ + RequestHandler/ |
Backend seam, handler registry, connection pools, 47 handlers |
src/DataDepot/ |
Per-entity bulk SQL depots |
src/IdResolver/Shopware/ |
Code/name/path → UUID resolvers |
src/Pipeline/Common/ |
ColumnSpec (header classifier) and pipeline glue |
src/Template/ |
TemplateCatalog — downloadable templates/examples |
src/Ownership/ |
Ownership guard seam (marketplace per-seller scoping) |
src/Migration/ |
The four plugin tables + later column adds |
src/Resources/config/ |
config.xml, routes.xml, services/*.xml |
src/Resources/app/administration/ |
The admin Vue module |